Reconciler Implementation and Design¶
Reconciler Functionality¶
General steps the reconciliation process needs to cover:
- Update the
ObservedGeneration
and initialize theStatus
conditions as defined insamplesource_lifecycle.go
andsamplesource_types.go
:src.Status.InitializeConditions() src.Status.ObservedGeneration = src.Generation
- Follow the later Reconcile/Create the Receive Adapter procedure.
- If successful, update the
Status
andMarkDeployed
:src.Status.PropagateDeploymentAvailability(ra)
- Follow the later Reconcile/Create the SinkBinding procedure for the receive adapter targeting the sink.
MarkSink
with the result:src.Status.MarkSink(sb.Status.SinkURI)
- Return a new reconciler event stating that the process is done:
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SampleSourceReconciled", "SampleSource reconciled: \"%s/%s\"", namespace, name)
Reconcile/Create The Receive Adapter¶
As part of the source reconciliation, we have to create and deploy (and update if necessary) the underlying receive adapter.
Verify the specified kubernetes resources are valid, and update the Status
accordingly
Assemble the ReceiveAdapterArgs
raArgs := resources.ReceiveAdapterArgs{
EventSource: src.Namespace + "/" + src.Name,
Image: r.ReceiveAdapterImage,
Source: src,
Labels: resources.Labels(src.Name),
AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
}
- Fetch the existing receive adapter deployment
namespace := owner.GetObjectMeta().GetNamespace() ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(expected.Name, metav1.GetOptions{})
- Otherwise, create the deployment
ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Create(expected)
- Check if the expected vs existing spec is different, and update the deployment if required
} else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) { ra.Spec.Template.Spec = expected.Spec.Template.Spec if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil { return ra, err }
- If updated, record the event
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "DeploymentUpdated", "updated deployment: \"%s/%s\"", namespace, name)
Reconcile/Create The SinkBinding¶
Instead of directly giving the details of the sink to the receive adapter, use a SinkBinding
to bind the receive adapter with the sink.
Steps here are almost the same with the Deployment
reconciliation mentioned earlier, but it is for another resource, SinkBinding
.
- Create a
Reference
for the receive adapter deployment. This deployment will beSinkBinding
's source:tracker.Reference{ APIVersion: appsv1.SchemeGroupVersion.String(), Kind: "Deployment", Namespace: ra.Namespace, Name: ra.Name, }
- Fetch the existing
SinkBinding
namespace := owner.GetObjectMeta().GetNamespace() sb, err := r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Get(expected.Name, metav1.GetOptions{})
- If it doesn't exist, create it
sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Create(expected)
- Check if the expected vs existing spec is different, and update the
SinkBinding
if requiredelse if r.specChanged(sb.Spec, expected.Spec) { sb.Spec = expected.Spec if sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Update(sb); err != nil { return sb, err }
- If updated, record the event
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SinkBindingUpdated", "updated SinkBinding: \"%s/%s\"", namespace, name)