Comment Discord crée des insights à partir de milliards de données ? image de couverture

Comment Discord crée des insights à partir de milliards de données ?

Anagram • Publié le 26 November 2021

Traductions

Chez Discord, l'équipe des données de la plateforme permet à l'organisation d'analyser, de comprendre, et d'exploiter ces dernières pour aider Discord à créer un espace permettant à chacun de trouver son appartenance. Discord utilise des données pour un certain nombre de raisons : identifier les mauvais acteurs et les communautés nocives ; développer ses perspectives concernant les produits critiques et les décisions stratégiques ; et former et évaluer l'efficacité des modèles d'apprentissage automatique (allez voir notre Politique de Confidentialité pour en savoir plus sur les informations que nous collectionnons et comment nous les utilisons !). Sans analyses régulières et rigoureuses de l'utilisation de notre produit, notre capacité à prendre des décisions sur les stratégies de notre entreprise à cette échelle serait vraiment un manque à gagner.

Les données brutes nous parviennent sous forme d'exportations de banques de données de production et de données de télémétrie (plus de 15 mille milliards d'enregistrements faits jusqu'à aujourd'hui et des milliards d'enregistrements journaliers). Lorsque Discord était une petite entreprise et que les cas d'utilisation des données étaient plus simples, il était quelque peu défendable, voire idéal, de calculer manuellement des ensembles de données utiles selon nos besoins. Aujourd'hui, nous traitons des pétaoctets de données avec 30 000 vCPU dans le cloud. Pour que cela soit utile, les données brutes doivent être traitées, privatisées d'après les politiques de données du gouvernement américain, et ensuite transformées en un schéma complexe de milliers de tables créées par ordinateur dans notre entrepôt de plus de 30 pétaoctets (nous utilisons Google BigQuery).

En ce moment, la partie de l'équipe de données de la plateforme responsable du traitement des données brutes et de leur accessibilité, se compose de 8 personnes, bien plus qu'avant. Étant donné la taille de l'équipe relative au reste de Discord, il était important de construire un système libre-service et aussi automatisé que possible. C'est l'histoire de la façon dont nous avons transformé des pétaoctets de données brutes en un entrepôt de données structurées et un système qui les maintient, appelé en interne Derived.

Prérequis et approche

Nous avions besoin d'un système de maintenance d'un graphe acyclique dirigé (DAG) complexe de données précalculées — dans notre cas, cela signifie un DAG de tables dérivées dans notre gros entrepôt BigQuery de données :

  1. Une table dérivée représente essentiellement une transformation de données qui peuvent avoir des tables prédécesseurs dans le DAG comme dépendance d'entrée : en d'autres termes, une définition de table dérivée peut être une instruction SQL SELECT qui redirige vers des données brutes ou d'autres tables de dérivées.
  2. En supposant que le DAG circule de haut en bas, on pourrait imaginer qu'en haut du DAG se trouvent les sources de données brutes et les tables de consultation ; au milieu, un ensemble de tables de données de base "dorées" réutilisables (par exemple, les inscriptions quotidiennes normalisées sur toutes les plateformes) ; et vers le bas du DAG, des tables destinés à être utilisés directement dans des analyses, des outils de BI ou des modèles d'apprentissage automatique.
  3. Le DAG peut contenir des milliers de tables, il doit donc être évolutif.

Bien que le système soit divisé en une série d'étapes livrables, nous voulions que le système final réponde aux exigences suivantes :

  1. Exécuter les mises à jour des tables dès que de nouvelles données sont disponibles (mais pas plus tôt !).
  2. Conserver une piste d'audit des mutations de chaque table.
  3. Inclure des primitives pour alimenter la lignée des données et un catalogue de données.
  4. Modifications intuitives et en libre-service des tables pour les équipes de parties prenantes comme l'ingénierie, la science des données et l'apprentissage automatique.
  5. Permettre l'intégration des contrôles d'accès aux données et prendre en charge l'application évolutive des politiques de confidentialité.
  6. Possibilité d'exporter automatiquement les données dérivées vers les bases de données de production pour les utiliser dans le produit de Discord destiné aux utilisateurs.
  7. Simplicité et facilité d'utilisation dans le contexte de l'environnement de l'infrastructure de Discord.

Bien que des solutions comme dbt, Airflow et Looker résolvent la plupart des problèmes cités ci-dessus, nous avons finalement souhaité une solution plus adaptée qui pourrait bien s'intégrer dans nos systèmes existants et qui nous donnerait la flexibilité d'étendre les cas d'utilisation au-delà de l'analyse.

Nous utilisions déjà Airflow pour organiser des travaux par lots et traiter des jeux de données plus simples, mais nous avons découvert quelques limitations :

  1. Écrire les travaux était compliqué car cela demandait aux utilisateurs d'avoir de solides connaissances sur Python, SQL et Airflow. Cela a enfreint notre prérequis sur le fait que les modifications de DAG devaient être automatisées.
  2. Organiser la planification de requêtes qui dépendaient les unes des autres selon des horaires différents (par exemple, savoir quand mettre à jour une table planifiée chaque semaine et lue à partir d’une table mise à jour mensuellement).
  3. Savoir exactement où insérer votre table intégrée dans le graphique de dépendance et comprendre son impact sur les autres tables n’était pas simple (par exemple, savoir quand vos tables doivent être remplies parce que les jeux de données précédents appartenant à une autre équipe étaient incomplets, introduisaient des données corrompues ou avaient des modifications de données).
  4. L'écriture de la logique pour les constructions de données incrémentielles qui ajoutent ou fusionnent des données dans une table existante était sujette aux erreurs et au copier-coller, surtout si l'on tient compte de toutes les conditions potentielles de reconstruction et de remplissage.

En prenant en compte nos prérequis, en ayant observé les points sensibles et en s'inspirant des solutions existantes, nous avons fait les choix de conception suivants :

  1. Les utilisateurs devront seulement connaître le SQL pour définir des tables dérivées.
  2. Les utilisateurs n'auront pas besoin de connaître la structure spécifique du DAG : le système déduira le DAG à partir du SQL.
  3. Tous les éléments devront être présents dans git pour un historique complet des modifications et une recherche facile des configurations de production actuelles.
  4. Le système intégrera du traitement de données avec nos systèmes existants de privatisation et des politiques gouvernementales sur les données. Nous prenons la vie privée de nos utilisateurs très au sérieux, et il était primordial que tout ce que nous construisions respecte nos stricts contrôles de confidentialité.
  5. L'historique de métadonnées et l'état actuel de chaque table devra être enregistré en un format accessible pour construire des outils de surveillance, de lignée et de performances.
  6. Les opérations de réparations de données (quand il y a des données manquantes) devront être simples et assurer la cohérence des données à travers tout l'entrepôt de données.

Version 1 : Le produit minimum viable

Pour le livrable initial, les objectifs prioritaires les plus élevés étaient d'intégrer les transformations de données dans git, de s'assurer que les données étaient cohérentes dans l'entrepôt et de simplifier les opérations de données. Nous avons construit les éléments suivants :

Le comportement de génération de table spécifié à l'aide de l'une des trois stratégies différentes permet de déterminer comment les tables sont construites, incrémentées et remplies :

Ainsi, Derived est né et s'intègre dans notre architecture comme illustré ci-dessous :

Version 2 : Ergonomie

Le MVP a prouvé la technologie de construction du DAG, des tables et de gestion des données de l'entrepôt, mais les gens en interne avaient du mal à créer de nouvelles tables dérivées sans l'aide des ingénieurs en données puisque le processus était trop compliqué et obscur. Donc pour la prochaine itération, nous nous sommes concentrés sur la création d'une interface d'utilisateurs plus simple pour facilement créer de nouvelles tables et écrire de la documentation juste à côté de leur code.

Fonctionnalités :

Lorsque la définition de la table est fusionnée dans la branche principale, Derived crée la table et la remplit avec toutes les données existantes. Pour chaque exécution ultérieure, il insère une heure de données dans la table existante - une opération de FUSION en termes de BigQuery. En outre, comme vous pouvez le voir ci-dessous, la documentation relative aux définitions des tables et des colonnes vit avec la définition de la table, ce qui minimise la possibilité de dérive entre la fonctionnalité du tableau et la documentation.

columns:
  - [emoji_id, INT64, NOT NULL, 'ID of an emoji']
  - [timestamp, TIMESTAMP, NOT NULL, 'First time a custom emoji was sent']
  - [timestamp_hour, TIMESTAMP, NOT NULL, 'Descriptions are added to database schemas so they appear in the BigQuery UI']

description: 'The table description used in data discovery tooling and generated documentation'
category: 'Category for organizing generated documentation'

strategy: merge # Merge means to incrementally update or insert records into the table based on the merge_keys and merge_mode
merge_keys: [emoji_id] # The table will have at most one record per emoji, if there is no record for the emoji then insert it otherwise ignore new records
merge_mode: keep_oldest # Only record the first time an emoji was sent and ignore all future records
merge_timestamp: timestamp # Determine the first time an emoji was sent based on timestamp in the send_emoji event

schedule: daily # Incrementally update this table once per day
window: daily # Add one day of source data each run

partition_by: timestamp_hour # Partition the BigQuery table by timestamp_hour
cluster_by: [emoji_id] # Sort each partition by emoji_id
timezone: pt # Timestamps are pacific timezone and daily aggregates are pacific timezone days

dataset: emoji_cohorts # BigQuery dataset
sql: |

  SELECT
    emoji_id,
    MIN(timestamp) AS timestamp,
    TIMESTAMP_TRUNC(MIN(timestamp), HOUR) AS timestamp_hour

  FROM events.send_emoji

  WHERE timestamp >= @start_day_utc   # Derived injects timestamps based on data availability of predecessor dependencies
    AND timestamp < @end_day_utc      # start_day_utc and end_day_utc are computed based on incremental, rebuild, and backfill operations

  GROUP BY emoji_id

Un autre avantage de l'adoption de cette interface normalisée est qu'elle fournit une couche d'abstraction qui nous permet d'itérer rapidement sur les systèmes sous-jacents à la configuration sans impacter les équipes.

Version 3 : Automatisation

La version 2 a permis à nos équipes de science des données de créer des centaines de tables sans assistance au cours de la première année. Ce succès a entraîné une nouvelle série de problèmes :

  1. La création de nouvelles tables était simple mais les actualiser demandait de l'aide aux ingénieurs en données : les constructions/reconstructions du DAG n'étaient pas automatisées et devaient être retouchées par des ingénieurs. Bien que ces tâches d'entretien soient relativement simples, elles prennent du temps et sont plus fréquentes à mesure que l'adoption augmente.
  2. Une seule table avec un bug de SQL bloquait tous les processus de toutes les tables et les bugs devenaient plus fréquents à mesure que l'adoption augmentait : Les suites de tests ne testaient pas de manière exhaustive toutes les combinaisons du SQL généré, ni les dépendances entre les tables. Pire, les bugs ne se manifestaient souvent qu'en cours d'exécution en production, en raison d'hypothèses incorrectes sur les données, et qu'il fallait souvent que les ingénieurs en données remplissent à nouveau les données et réparent les tables.
  3. Il était difficile d'utiliser les informations de Derived pour alimenter les fonctionnalités des applications, car ****l'entrepôt de données (BigQuery) n'est pas optimisé pour les exigences de latence en millisecondes des services destinés aux utilisateurs.

La version 3 est donc concentrée sur l'amélioration de la fiabilité des déplacements et l'automatisation de la reconstruction/réparation des tables dérivées. Pour accomplir cela, nous nous sommes concentrés sur l'ergonomie, les tests et l'automatisation générale :

Tester :

Nous voulions que les utilisateurs puissent faire des tests tout en développant de nouvelles tables, nous avons donc mis en place ce qui suit :

Automatisation :

Dans la version 2 de Derived, les métadonnées de la table ont été suivies dans Airflow, ce qui entraîne un certain nombre d'étapes manuelles pendant les opérations de maintenance des données (par exemple, un remblayage nécessite de mettre le DAG en pause, d'éxecuter l'opération, puis de synchroniser l'état réel de la table avec les métadonnées d'Airflow).

Pour automatiser les opérations de données, nous avons déplacé le suivi de l'état des tables hors d'Airflow et dans un journal de métadonnées afin que Derived puisse décider indépendamment quand réparer, reconstruire et ajouter des données aux tables.

Un suivi d'état plus détaillé au niveau de la table débloque également les calculs parallèles afin qu'un processus parent ne se bloque pas lors du séquençage et l'arrangement de plus de 900 tables. Toutes les tables peuvent s'exécuter simultanément et aussi fréquemment que souhaité pour que les aperçus de Derived restent cohérents dans l'entrepôt de données et à jour avec les sources de données. Chaque programme de mise à jour des tables est déployé en tant que son propre "Pod Kubernetes" : lorsqu'un pod démarre, il passe par les étapes suivantes :

Le journal des métadonnées est disponible dans BigQuery et permet une surveillance détaillée, une analyse des performances et une lignée de données. Il répond à des questions de surveillance telles que Quand la table a-t-elle été mise à jour pour la dernière fois ? Les données de la table sont-elles récentes ? Pour l'analyse des performances, nous joignons le journal des métadonnées au fichier information_schema de BigQuery pour obtenir des détails sur l'exécution des requêtes et pour établir des rapports sur les métriques de chaque table. La lignée des données peut être obtenue à partir du journal des métadonnées en suivant les dépendances des prédécesseurs lorsque les tables sont mises à jour, de sorte que la lignée entière puisse être reconstruite en parcourant le journal des métadonnées.

Fonctionnalités de Discord :

Jusqu'à présent, Derived ne fonctionnait que sur des jeux de données BigQuery (un entrepôt de données conçu pour le traitement du Big Data) qui ont souvent des temps de réponse aux requêtes supérieurs à une seconde. Afin d'alimenter les fonctionnalités de l'application, les temps de réponse devaient être beaucoup plus rapides, en particulier pour les fonctionnalités d'apprentissage automatique où le flux de l'application est le suivant : recevoir une demande de l'utilisateur, interroger plusieurs jeux de données de Derived pour créer un ensemble de caractéristiques, faire une prédiction et répondre à l'utilisateur en une seconde. Pour cela, nous avons ajouté une nouvelle option de configuration sur Derived pour exporter automatiquement de BigQuery vers Scylla afin que le jeu de données de Derived soit disponible dans une base de données conçue pour les requêtes de haute performance dans les systèmes en ligne.

Conclusion

Nous utilisons la version 3 en production depuis plus d'un an maintenant et nous avons atteint les sept objectifs initiaux que nous nous étions fixés...

✔️ Les mises à jour de tables devraient s'effectuer aussitôt que les données sont disponibles (mais pas avant) !

✔️ Maintenir une piste d'audit des mutations vers les ensembles de données dérivées.

✔️ Les primitives sont incluses pour alimenter les outils de lignée et de catalogue de données.

✔️ Les modifications du DAG doivent être en libre-service et intuitives pour les équipes des parties prenantes comme l'ingénierie, la science des données et l'apprentissage automatique.

✔️ Connaissance des contrôles d'accès aux données et application évolutive des politiques de gouvernance des données.

✔️ Capable d'exporter automatiquement des données dérivées vers des banques de données de production pour les utiliser dans le produit de Discord destiné aux utilisateurs.

✔️ Simplicité et facilité d'utilisation dans le contexte de l'environnement de Discord.

... mais le voyage est loin d'être terminé, il y a maintenant des milliers de tables en production et l'équipe reçoit souvent des commentaires et des suggestions de personnes engagées en interne qui utilisent Derived pour construire des ensembles de données très complexes. Le système traite quotidiennement des pétaoctets de données provenant d'une multitude de points de données et nous améliorons toujours plus les performances et les fonctionnalités de Derived. Nous travaillons actuellement sur une version 4 - nous sommes très créatifs avec les noms de nos projets - et nous sommes impatients de partager nos connaissances sur les itérations à venir.

Ouf ! C'était beaucoup d'informations et toute une aventure pour l'équipe ! Si travailler avec des données massives vous intéresse, nous vous invitons à consulter notre page d'offres d'emploi et à postuler !