AWS Lambda et Kinesis Data streams : comment rejouer ses enregistrements en échec ?

Le service serverless AWS Lambda permet, entre autres, de traiter plusieurs événements d’entrée comme, par exemple, des événements Kinesis Data streams. Cependant, il arrive qu’une partie des enregistrements reçus ne puisse pas être entièrement traitée : évènement invalide pour non respect des spécifications, service momentanément indisponible, problème dans le code… Pour assurer la résilience du service, certaines règles peuvent être mises en place pour les rejouer et/ou les sauvegarder temporairement. Nous verrons, dans cet article, différentes configurations de Lambda pouvant répondre à certains cas d’usage.

Paramétrer sa source Kinesis Data streams pour AWS Lambda

Figure 1 : schéma d’architecture

Avant de vous présenter différents cas d’usages qui illustrent la gestion des défaillances entre Kinesis data streams et Lambda, je vais revenir rapidement sur les paramètres qui influent directement sur ce processus, à savoir : 

  • batch size,  
  • maximum batching window, 
  • maximum retry attempts, 
  • error bisect, 
  • report batch item failures
  • destination on failure

Ces paramètres sont configurables au niveau du trigger Kinesis de la Lambda. Si ils vous sont déjà familiers, je vous invite à vous rendre directement à la partie suivante.

Batch size et maximum batching window

Figure 2 : Configuration du “Batch size” et du “Batch Window” sur la console AWS

Il est possible de limiter le nombre d'enregistrements reçu pour une exécution de Lambda en jouant sur deux paramètres : le batch size et le maximum batching window

Le paramètre batch size représente le nombre maximum d'enregistrements qu’une instance d’exécution de fonction Lambda va pouvoir traiter. Par exemple, si la valeur du batch size est configurée sur 1, la Lambda ne pourra traiter qu’un seul enregistrement par exécution. Ainsi, si trois éléments sont envoyés dans le flux Kinesis, trois exécutions de Lambda seront déclenchées. Au contraire, dans notre exemple, si le batch size est à 3, une seule exécution de Lambda va traiter les trois éléments envoyés dans le flux. La limite de ce paramètre est fixée à 10 000.

Figure 3 : exemple d’utilisation du paramètre “batch size”

Un autre moyen de contrôler le nombre maximum d'enregistrements traités par la Lambda est de définir un maximum batching window. Celui-ci va permettre de déterminer pendant combien de temps (en secondes) les enregistrements reçus vont pouvoir s’accumuler pour ensuite être traités par Lambda. Par exemple, si ce paramètre est défini à 30 secondes, et que dix éléments sont envoyés au cours de ce laps de temps, alors une exécution de Lambda va traiter les dix éléments.

Maximum Retry Attempts

Figure 4 : Configuration du paramètre “retry attempts” sur la console AWS

Grâce au paramètre retry attempts, Il est possible de définir un nombre de nouvelles tentatives pour une exécution de Lambda en échec. Cela peut être utile lorsqu’une Lambda appelle un service qui peut être temporairement indisponible, par exemple. Ainsi, même si la première exécution échoue, la seconde peut réussir si le service est de nouveau accessible. La valeur par défaut de ce paramètre est -1, ce qui signifie qu’une Lambda en erreur va rejouer les enregistrements jusqu’à ce qu’ils expirent (rétention du flux Kinesis) ou que leur âge maximum soit atteint (si spécifié). 

Il est important de noter que, dans le cas où ce paramètre n’est pas spécifié, une Lambda peut rejouer les mêmes enregistrements sur une longue période (jusqu’à 365 jours, rétention maximal d’un flux Kinesis) et donc bloquer la consommation des autres enregistrements du même shard Kinesis. Il faut donc porter une attention particulière à ces paramètres et monitorer la métrique CloudWatch IteratorAge pour être alerté si une shard est bloquée.

Maximum Record Age

Figure 5 : Configuration du paramètre “Maximum age of record” sur la console AWS

L’âge maximum d'un enregistrement représente la durée pour laquelle une Lambda peut continuer de traiter un même enregistrement et le rejouer, depuis sa première réception. Ce paramètre est utile pour se protéger du cas expliqué dans le paragraphe précédent.  Voici un exemple illustré de ce paramètre avec une valeur de 5 minutes :

Figure 6 : exemple d’utilisation du paramètre “maximum record age”

Sur l’illustration ci-dessus, on voit qu’un enregistrement reçu à 10h02 va être rejoué pendant 5 minutes et va donc expirer à 10h07.

Voyons maintenant les paramètres qui influent directement sur la gestion des défaillances au sein d’un même lot d'enregistrements.

Bisect on Function Error

Figure 7 : Configuration du paramètre “Split batch on error” sur la console AWS

Si ce paramètre est activé et que la fonction est en erreur, le lot d’enregistrements sera divisé en deux et chaque lot sera rejoué séparément, dans de nouvelles instances d’exécution de Lambda. Ce processus permet d’isoler le ou les enregistrements en erreur.

ℹ️ Ce rejeu est indépendant du paramètre maximum retry attempts que j’ai décris plus haut. 

Voici, ci-dessous, deux exemples illustrés d’une Lambda qui reçoit, en entrée, cinq enregistrements dont un en échec. Ici, seul le paramètre “Split batch on error” est activé.

Figure 8 : exemples d’utilisation du paramètre “Bisect on function error”

Sur le schéma ci-dessus, on peut observer que la séparation des enregistrements ne dépend pas de la position de l’enregistrement en échec. Si le lot contient un enregistrement en échec, il sera divisé en deux, équitablement si le nombre d’enregistrements est pair et avec un enregistrement de plus à la fin du deuxième lot si impair. 

Ce paramètre est utile pour isoler un enregistrement en erreur mais présente un inconvénient majeur s’il est utilisé seul : le rejeu de certains éléments déjà traités avec succès. Ainsi, cela génère du traitement inutile, et donc des coûts supplémentaires, et peut potentiellement provoquer l’apparition de doublons, ce qui peut être un problème si l’idempotence n’est pas gérée dans la suite du traitement en aval.

Report batch item failures

Figure 9 : Configuration du paramètre “Report batch item failures” sur la console AWS

Le paramètre report batch item failures permet de renvoyer une version positive partielle pour certains enregistrements du lot. Ainsi, il permet de diminuer le nombre d’enregistrements rejoués lorsque certains sont en succès. 

Pour traiter les différents batchs provenant du flux Kinesis, Lambda utilise une méthode de checkpointing avec des numéros de séquence. Lorsqu’un lot entier est en succès, le numéro de séquence est mis à jour. L’activation du paramètre reportBatchItemFailures permet de mettre à jour ce numéro de séquence directement en sortie de Lambda, sur l’enregistrement souhaité - même si celui-ci est en erreur. 

Voici un exemple de comportement d’une Lambda avec un lot contenant un enregistrement en échec et avec le paramètre “report batch item failures” activé :

Figure 10 : exemple d’utilisation du paramètre “report batch item failures”

Sur le schéma ci-dessus, on peut voir que les enregistrements en succès (1, 2 et 3) n’ont pas été traités plusieurs fois car, lors de la deuxième exécution, la reprise du lot s’est faite à partir du premier enregistrement en erreur (4). Or, l’élément 5, lui, n’a jamais été traité. Nous allons donc voir comment associer ce paramètre ainsi que le précédent pour, à la fois traiter tous les enregistrements, mais aussi ne pas traiter plusieurs fois un même enregistrement en succès.

Destination on failure 

Figure 11 : Configuration du paramètre “On failure destination” sur la console AWS

Les rejeux d’enregistrements en erreur peuvent ne pas suffire à amener la Lambda en succès - c’est le cas, par exemple, lorsqu’une erreur inattendue se produit, un changement de spécifications, etc.  Le paramètre destination on failure permet de définir un lieu de stockage pour les enregistrements en échec pour, donc, ne pas les perdre. Ces destinations peuvent être : une SNS, une SQS ou un bucket S3. 

Nous verrons l’exemple d’un bucket S3 plus loin dans l’article.

Exemple d’un cas d’usage 

Dans cet exemple, nous allons voir un cas simple d’une Lambda qui reçoit un lot d'enregistrements dont deux avec des formats inattendus. Les comportements souhaités sont les suivants :

  • traitement de la totalité des enregistrements (en échec ou non),
  • les enregistrements en succès doivent être traités exactement une seule fois,
  • les enregistrements en échec doivent finir par être isolés dans leur propre exécution.  

Les paramètres du trigger Kinesis sont donc les suivants : 

  • Split batch on error : yes ;
  • Report batch item failures : yes ;
  • Retry attempts : 0.

Voici le code du handler de la Lambda : 

import json 
from typing import Any 
import base64 

def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]: 	
  number_of_records: int = len(event['Records']) 		
  for record in event['Records']: 		
    try: 			
      curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] 
      encoded_data: str = record['kinesis']['data'] 	
      decoded_data: str = base64.b64decode(encoded_data).decode('utf-8') 
      json.loads(decoded_data) 		
    except Exception: 			
      return {"batchItemFailures":[{"itemIdentifier":curRecordSequenceNumber}]} 	
  return {"batchItemFailures":[]}

Dans le code ci-dessus, on peut voir que, si une erreur intervient lors du traitement d’un enregistrement, une réponse positive partielle est renvoyée car on spécifie le numéro de séquence correspondant au premier enregistrement en échec. 

Voici un schéma illustré des différentes exécutions qui ont lieu avec ces configurations :

Figure 12 : schéma des exécutions de Lambda pour l’exemple donné

Dans le schéma ci-dessus, on peut observer que : 

  • tous les enregistrements ont été traité, 
  • les enregistrements en succès (1, 3, 5) n’ont été traité qu’une seule fois,
  • les enregistrements en échec (2,4) sont isolés dans des leur propre exécutions de Lambda.

Voyons maintenant une alternative avec le paramètre “Maximum Retry attempts” avec une valeur de 3 :

Figure 13 : schéma des exécutions de Lambda pour l’exemple donné

Grâce à l’ajout d’un “Maximum Retry attempts” et grâce aux paramètres décrits plus haut, on observe que seuls les enregistrements en échec sont rejoués 3 fois, une fois isolés dans leur exécution. Ainsi, les exécutions précédentes ne comptent pas pour le “Maximum Retry attempts”.

Ainsi, on peut jouer sur le nombre de tentatives de la Lambda sans impacter le lot entier !

Rejouer les éléments à partir d’un Bucket S3

Un intérêt majeur à isoler les enregistrements en échec dans leur propre exécution de Lambda est de pouvoir les rediriger vers une destination. Comme évoqué plus haut, il existe différentes redirections possibles pour un évènement Kinesis : SNS, SQS et, depuis peu (novembre 2024), S3. Dans le cas d’une destination SNS ou SQS, seules les métadonnées de l’enregistrement sont envoyées. Pour une destination S3, tout l’enregistrement - contenu  du message inclus - est envoyé. Nous allons voir un exemple avec une destination S3.

Voici le schéma d’architecture correspondant à notre situation :

Figure 14 : schéma d’architecture

Sur le schéma ci-dessus, on peut constater qu’il y a deux Lambda : 

  • une avec un trigger Kinesis avec le paramètre “destination on-failure” configuré sur le bucket S3, qui va traiter les messages reçu depuis le stream,
  • une autre sans déclencheur, chargée de lire et traiter les messages en erreur stockés dans le bucket S3. De cette façon, le rejeu des messages en erreur est ponctuel et manuel. 

Les enregistrements en erreur sont stockés dans le bucket S3 dans un fichier de type application/octet-stream et à cet emplacement dans le bucket :  aws/lambda/<id>/shardId-<shard_id>/<year>/<month>/<day>/, avec <id> qui représente l’identifiant contenu dans l’ARN de l’event source mapping du trigger Kinesis de la Lambda.

Un seul fichier va être créé par bloc d’erreur dans le lot d’enregistrement reçu par la Lambda. Voici un exemple qui illustre différents cas : 

Figure 15 : exemple de fichiers S3 créés

Voici le code de la première Lambda, qui possède le trigger Kinesis : 

import json 
from typing import Any 
import base64 

def lambda_handler(event: dict[str, Any], context: Any) -> dict[str, Any]: 	
  for record in event['Records']: 		
    try: 			
      curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] 
      encoded_data: str = record['kinesis']['data'] 	
      decoded_data: str = base64.b64decode(encoded_data).decode('utf-8') 
      json.loads(decoded_data) 		
    except Exception: 			
      return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} 	
  return {"batchItemFailures":[]}

Et voici le code de la seconde Lambda :

import json
import boto3 
import logging
from typing import Any
import base64 

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')

def lambda_handler(event: dict[str, Any], context: Any) -> None:

    bucket_name: str = "test-lafonso"
    s3_files: dict[str, Any] = s3_client.list_objects_v2(Bucket=bucket_name)["Contents"]

    for file in s3_files:
        try: 
            key: str = file['Key']
            file_content: dict[str, Any] = s3_client.get_object(Bucket=bucket_name, Key=key)['Body'].read()
            file_payload: str = json.loads(file_content.decode('utf8'))["payload"]
            kinesis_records: list[dict[str, Any]] = json.loads(file_payload)["Records"]
            
            for kinesis_record in kinesis_records:
                encoded_data: str = kinesis_record['kinesis']['data']
                decoded_data: str = base64.b64decode(encoded_data).decode('utf-8') 
                json.loads(decoded_data) 		

            s3_client.delete_object(Bucket=bucket_name, Key=key)

        except Exception:
            logger.error(f"Error processing file {key} from bucket {bucket_name}.", exc_info=True)

Cette Lambda a pour rôle de lister les fichiers présents dans le bucket S3, de récupérer les enregistrements Kinesis et de les traiter à nouveau. Si le traitement réussit, le fichier est supprimé du bucket

De cette façon, si une exécution de Lambda tombe en erreur, l’enregistrement n’est pas perdu et peut être rejoué à n’importe quel moment !

Conclusion

Pour conclure, nous avons vu qu’il est important de bien comprendre comment fonctionne un déclencheur Kinesis pour une Lambda et comment bien le paramétrer en fonction de nos besoins. Les éléments présentés à travers cet article sont importants pour maintenir de la cohérence dans les données traitées, éviter du traitement inutile et assurer une reprise sur erreur fonctionnelle.

Resources 

https://docs.aws.amazon.com/lambda/latest/dg/kinesis-on-failure-destination.html 

https://aws.amazon.com/fr/about-aws/whats-new/2019/11/aws-lambda-supports-failure-handling-features-for-kinesis-and-dynamodb-event-sources/ 

https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html