Temporal.io workflow with Openfaas functions
Simple setup to test the recently released workflow engine Temporal (from Uber's Cadence creators) with Openfaas serverless framework in a local k3d kubernetes cluster.
Openfaas is an awesome serverless framework project to spin up functions and api-services quickly in a variety of programing languajes focused (but not limited) to one-shot functions.
Temporal allows for complex workflows hiding most of the complexity behind building scalable distributed applications (from their website).
Install Arkade and required packages
Required packages are easy to install with arkade from Alex Ellis, the creator and founder of Openfaas:
1 curl -sLS https://dl.get-arkade.dev | sudo sh
2
3 arkade get faas-cli
4 arkade get k3d
5
6 export PATH=$PATH:$HOME/.arkade/bin/
Create k8s dev cluster
Set up a kubernetes locally from k3d with a traefik load balancer:
1 k3d cluster create openfaas-temporalio --agents 2 --api-port 6443 --port "8080:80@loadbalancer"
2
3 export KUBECONFIG=$(k3d kubeconfig write openfaas-temporalio)
4
5 kubectl cluster-info
6 kubectl get nodes
Install Openfaas
Launch openfaas in k3d with arkade:
1 arkade install openfaas --basic-auth=false --set=serviceType=ClusterIP --set=ingress.enabled=true --set=ingress.annotations."kubernetes\.io/ingress\.class"=traefik
Default ingress hostname is 'gateway.openfaas.local'
Install Temporal
https://github.com/temporalio/helm-charts#minimal-installation-with-required-dependencies-only
This is the minimal stack to use temporal (without metrics or Elastic Search):
1 git clone https://github.com/temporalio/helm-charts
2
3 cd helm-charts
4
5 helm dependencies update
6
7 # note: there is a pending PR in this repo for the ingress indentation
8 helm upgrade --install \
9 --set server.replicaCount=1 \
10 --set cassandra.config.cluster_size=1 \
11 --set prometheus.enabled=false \
12 --set grafana.enabled=false \
13 --set elasticsearch.enabled=false \
14 --set kafka.enabled=false \
15 --set web.ingress.enabled=true \
16 --set web.ingress.annotations."kubernetes\.io/ingress\.class"=traefik \
17 --set web.ingress.hosts[0]=dashboard.temporal.local \
18 temporaltest . --timeout 15m
Add ingress to /etc/hosts
Instead of using port-forward to access applications i will add the k3d traefik ingress rules to /etc/hosts with this snippet (requires jq to parse json):
1 kubectl get ingress --all-namespaces -o json | jq -r ' "\(.items[].status.loadBalancer.ingress[].ip) \(.items[].spec.rules[].host)"' 2>/dev/null | uniq | sudo tee -a /etc/hosts
Check Openfaas
Test openfaas is working with a sample function from the store:
1 export OPENFAAS_URL=http://gateway.openfaas.local
2
3 faas-cli store deploy Figlet
4 faas-cli list
5 echo "test openfaas" | faas-cli invoke figlet
6 # expected output
7 # _ _ __
8 # | |_ ___ ___| |_ ___ _ __ ___ _ __ / _| __ _ __ _ ___
9 # | __/ _ \/ __| __| / _ \| '_ \ / _ \ '_ \| |_ / _` |/ _` / __|
10 # | || __/\__ \ |_ | (_) | |_) | __/ | | | _| (_| | (_| \__ \
11 # \__\___||___/\__| \___/| .__/ \___|_| |_|_| \__,_|\__,_|___/
12 # |_|
Check Temporal
There is a lot of samples in https://github.com/temporalio/samples-go to show the capabilities of Temporal.
Running the helloworld sample from the temporal docs:
- Create the default temporal namespace with the integrated CLI
1 kubectl --namespace=default get pods -l "app.kubernetes.io/instance=temporaltest"
2 kubectl exec -it services/temporaltest-admintools /bin/bash
3 tctl namespace register default
- Create the worker for the helloworld workflow
1 # this port-forward allows to use the samples directly from the repo (without edit the client.Options values)
2 kubectl port-forward services/temporaltest-frontend-headless 7233:7233 &
3 git clone https://github.com/temporalio/samples-go.git
4 cd samples-go
5 go run helloworld/worker/main.go
6 # [expected output]
7 # 2020/10/26 19:16:13 INFO No logger configured for temporal client. Created default one.
8 # 2020/10/26 19:16:13 INFO Started Worker Namespace default TaskQueue hello-world WorkerID 10709@ws-steran@
- Another shell to execute the workflow activity
1 cd /path/to/samples-go
2 go run helloworld/starter/main.go
3 # [expected output]
4 # 2020/10/26 19:17:08 INFO No logger configured for temporal client. Created default one.
5 # 2020/10/26 19:17:08 Started workflow WorkflowID hello_world_workflowID RunID 2cab124b-0590-4072-b5d3-8b81763c0efc
6 # 2020/10/26 19:17:08 Workflow result: Hello Temporal!
- Finally in the first shell close the worker and the port-forward
1 ^C # ctrl+c close worker
2 fg 1 # get background task
3 ^C # ctrl+c close kubectl forward
Hello world temporal sample with openfaas function
For testing purposes the openfaas function will:
- Register the workflow worker and activity at launch
- Starts a http server
- Expose a http endpoint to execute the workflow activity
Create a new function with the golang-middleware template from store. Golang and java are the two sdks available for temporal at this time:
1 faas template store pull golang-middleware
2
3 faas-cli new --lang golang-middleware temporalio-helloworld --prefix=kammin --gateway=gateway.openfaas.local
Add a goroutine to launch the worker in the main function on the template:
1// template/golang-middleware/main.go
2func main() {
3 readTimeout := parseIntOrDurationValue(os.Getenv("read_timeout"), defaultTimeout)
4 writeTimeout := parseIntOrDurationValue(os.Getenv("write_timeout"), defaultTimeout)
5
6 s := &http.Server{
7 Addr: fmt.Sprintf(":%d", 8082),
8 ReadTimeout: readTimeout,
9 WriteTimeout: writeTimeout,
10 MaxHeaderBytes: 1 << 20, // Max header of 1MB
11 }
12
13 go function.Workflow()
14
15 http.HandleFunc("/", function.Handle)
16
17 listenUntilShutdown(s, writeTimeout)
18}
Write the function handler. This is the same code used before:
1// temporalio-helloworld/handler.go
2package function
3
4import (
5 "context"
6 "fmt"
7 "log"
8 "os"
9
10 "go.temporal.io/sdk/client"
11 "go.temporal.io/sdk/worker"
12
13 "net/http"
14
15 "github.com/temporalio/samples-go/helloworld"
16)
17
18const (
19 // hostport is the host:port which is used if not passed with options.
20 hostport = "temporaltest-frontend-headless.default.svc.cluster.local:7233"
21 // namespace is the namespace name which is used if not passed with options.
22 namespace = "default"
23)
24
25func getEnv(key, fallback string) string {
26 if value, ok := os.LookupEnv(key); ok {
27 return value
28 }
29 return fallback
30}
31
32func Workflow() {
33
34 c, err := client.NewClient(client.Options{HostPort: getEnv("temporal-dns", hostport)})
35 if err != nil {
36 log.Fatalln("Unable to create client", err)
37 }
38 defer c.Close()
39
40 w := worker.New(c, "hello-world", worker.Options{})
41
42 w.RegisterWorkflow(helloworld.Workflow)
43 w.RegisterActivity(helloworld.Activity)
44
45 err = w.Run(worker.InterruptCh())
46 if err != nil {
47 log.Fatalln("Unable to start worker", err)
48 }
49
50}
51
52func Handle(rw http.ResponseWriter, r *http.Request) {
53
54 // The client is a heavyweight object that should be created once per process.
55 c, err := client.NewClient(client.Options{HostPort: getEnv("temporal-dns", hostport)})
56 if err != nil {
57 log.Fatalln("Unable to create client", err)
58 }
59 defer c.Close()
60
61 workflowOptions := client.StartWorkflowOptions{
62 ID: "hello_world_workflowID",
63 TaskQueue: "hello-world",
64 }
65
66 we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, helloworld.Workflow, "Temporal")
67 if err != nil {
68 log.Fatalln("Unable to execute workflow", err)
69 }
70
71 log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
72
73 // Synchronously wait for the workflow completion.
74 var result string
75 err = we.Get(context.Background(), &result)
76 if err != nil {
77 log.Fatalln("Unable get workflow result", err)
78 }
79 log.Println("Workflow result:", result)
80
81 rw.Header().Set("Content-Type", "application/json")
82 rw.Write([]byte(fmt.Sprintf(`{"success":true, "message": "%s"}`, result)))
83
84}
One command is all we need to build, push and deploy our new function:
1 faas-cli up -f temporalio-helloworld.yml --build-arg GO111MODULE=on
If you cant push the image to a remote registry change the openfaas deployment value to 'image_pull_policy=IfNotPresent'. Then do 'faas-cli build' and 'k3d image import' instead. This and using private registries on the openfaas docs.
Check the function is depoyed and the workflow registration in the function logs:
1 faas-cli list
2 # [expected output]
3 # Function Invocations Replicas
4 # figlet 1 1
5 # temporalio-helloworld 0 1
1 faas-cli logs temporalio-helloworld
2 # [expected output]
3 # 2020-10-26T20:04:47Z 2020/10/26 20:04:47 Started logging stderr from function.
4 # 2020-10-26T20:04:47Z 2020/10/26 20:04:47 Started logging stdout from function.
5 # 2020-10-26T20:04:47Z Forking - ./handler []
6 # 2020-10-26T20:04:47Z 2020/10/26 20:04:47 OperationalMode: http
7 # 2020-10-26T20:04:47Z 2020/10/26 20:04:47 Timeouts: read: 10s, write: 10s hard: 10s.
8 # 2020-10-26T20:04:47Z 2020/10/26 20:04:47 Listening on port: 8080
9 # 2020-10-26T20:04:47Z 2020/10/26 20:04:47 Writing lock-file to: /tmp/.lock
10 # 2020-10-26T20:04:47Z 2020/10/26 20:04:47 Metrics listening on port: 8081
11 # 2020-10-26T20:04:47Z 2020/10/26 20:04:47 stdout: 2020/10/26 20:04:47 INFO No logger configured for temporal client. Created default one.
12 # 2020-10-26T20:04:48Z 2020/10/26 20:04:48 stdout: 2020/10/26 20:04:48 INFO Started Worker Namespace default TaskQueue hello-world WorkerID 14@temporalio-helloworld-77d4bc6b99-kt8hj@
Execute the function:
1 curl http://gateway.openfaas.local/function/temporalio-helloworld
2 # [expected output]
3 # {"success":true, "message": "Hello Temporal!"}
Lot of information is provided in the temporal UI:
My next move is to review the temporal docs for complex use cases like versioning workflows or saga patterns.
Uninstall
To uninstall the lab delete the entries from /etc/hosts and the k3d cluster:
1k3d cluster delete openfaas-temporalio