Monitoring Kubernetes en 2025 : Guide Complet Prometheus et Grafana
Guide complet du monitoring Kubernetes en 2025 avec Prometheus, Grafana, OpenTelemetry et eBPF pour une observabilité totale de vos clusters.
Guide complet des architectures événementielles modernes : CQRS, Event Sourcing, Kafka et RabbitMQ pour des systèmes scalables et résilients.
L’architecture événementielle (Event-Driven Architecture – EDA) représente un changement fondamental dans la conception des systèmes distribués modernes. Contrairement aux architectures traditionnelles request-response, l’EDA permet aux composants de réagir de manière asynchrone aux événements, offrant une scalabilité, une résilience et une extensibilité accrues.
En 2025, l’adoption de l’architecture événementielle a atteint un niveau de maturité sans précédent. Des frameworks comme EventSourcingDB 1.0 (lancé en mai 2025) et OpenCQRS 1.0 (octobre 2025) ont consolidé les meilleures pratiques pour l’écosystème JVM. Cette évolution témoigne de la transition des architectures événementielles du statut de pattern émergent à celui de standard industriel pour les systèmes critiques.
Le pattern CQRS sépare les opérations de lecture (Query) et d’écriture (Command) d’un système en utilisant des modèles distincts. Cette séparation permet d’optimiser indépendamment chaque partie selon ses besoins spécifiques : cohérence forte pour les écritures, performance et scalabilité pour les lectures.
La distinction fondamentale repose sur le fait que les commandes modifient l’état du système sans retourner de données, tandis que les requêtes retournent des données sans modifier l’état. Cette séparation conceptuelle permet une architecture où le modèle d’écriture peut se concentrer sur la capture de l’historique des changements, tandis que le modèle de lecture fournit des vues optimisées et personnalisées des données pour des requêtes efficaces.
Dans une architecture CQRS complète, le side écriture utilise généralement un modèle de domaine riche suivant les principes du Domain-Driven Design (DDD). Ce modèle encapsule la logique métier complexe et garantit la cohérence des invariants. Le side lecture, quant à lui, peut utiliser des projections dénormalisées optimisées pour des cas d’usage spécifiques.
// Command Side - Agrégat riche avec logique métier
class OrderAggregate {
private orderId: string;
private items: OrderItem[] = [];
private status: OrderStatus;
private totalAmount: number = 0;
placeOrder(command: PlaceOrderCommand): OrderPlacedEvent {
// Validation métier
if (command.items.length === 0) {
throw new Error('Order must contain at least one item');
}
// Calcul et validation
const total = this.calculateTotal(command.items);
if (total > command.creditLimit) {
throw new Error('Order exceeds credit limit');
}
// Création de l'événement
return new OrderPlacedEvent({
orderId: this.generateOrderId(),
customerId: command.customerId,
items: command.items,
totalAmount: total,
timestamp: new Date()
});
}
private calculateTotal(items: OrderItem[]): number {
return items.reduce((sum, item) => sum + (item.price item.quantity), 0);
}
}
// Query Side - Projection optimisée pour les lectures
interface OrderReadModel {
orderId: string;
customerName: string;
totalAmount: number;
itemCount: number;
status: string;
placedAt: Date;
// Données dénormalisées pour éviter les jointures
shippingAddress: Address;
paymentMethod: string;
}
L’un des défis majeurs du CQRS est la cohérence éventuelle entre les modèles de lecture et d’écriture. Lorsque les bases de données de lecture et d’écriture sont séparées, les données de lecture peuvent ne pas refléter immédiatement les changements les plus récents, ce qui entraîne des données obsolètes.
Pour gérer cette complexité, plusieurs stratégies sont applicables en production :
// Implémentation de read-your-writes consistency
class EventuallyConsistentQueryHandler {
constructor(
private readStore: ReadStore,
private eventStore: EventStore,
private projectionVersion: ProjectionVersionTracker
) {}
async getOrder(orderId: string, minimumVersion?: number): Promise {
const currentVersion = await this.projectionVersion.getVersion(orderId);
// Si une version minimale est requise, attendre la synchronisation
if (minimumVersion && currentVersion < minimumVersion) {
await this.waitForProjection(orderId, minimumVersion);
}
return await this.readStore.getOrder(orderId);
}
private async waitForProjection(
orderId: string,
targetVersion: number,
timeout: number = 5000
): Promise {
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
const currentVersion = await this.projectionVersion.getVersion(orderId);
if (currentVersion >= targetVersion) {
return;
}
await this.sleep(50); // Polling interval
}
throw new Error('Projection synchronization timeout');
}
private sleep(ms: number): Promise {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
L’Event Sourcing persiste l’état d’une entité métier sous forme de séquence d’événements de changement d’état. Au lieu de stocker l’état actuel dans une base de données, chaque changement est enregistré comme un événement immuable. L’état actuel est reconstruit en rejouant tous les événements depuis le début.
Cette approche offre plusieurs avantages critiques pour les systèmes d’entreprise modernes :
L’Event Store est le composant central d’une architecture Event Sourcing. Il doit garantir l’immuabilité, l’ordre et la persistance fiable des événements. Voici une implémentation de référence utilisant PostgreSQL :
-- Schema pour un Event Store PostgreSQL
CREATE TABLE eventstore (
eventid UUID PRIMARY KEY DEFAULT genrandomuuid(),
aggregateid UUID NOT NULL,
aggregatetype VARCHAR(255) NOT NULL,
eventtype VARCHAR(255) NOT NULL,
eventdata JSONB NOT NULL,
metadata JSONB,
version INTEGER NOT NULL,
createdat TIMESTAMP DEFAULT CURRENTTIMESTAMP,
-- Contrainte d'unicité pour éviter les doublons
CONSTRAINT uniqueaggregateversion
UNIQUE (aggregateid, version)
);
-- Index pour les requêtes fréquentes
CREATE INDEX idxaggregateevents
ON eventstore(aggregateid, version);
CREATE INDEX idxeventtype
ON eventstore(eventtype);
CREATE INDEX idxcreatedat
ON eventstore(createdat);
// Implémentation TypeScript de l'Event Store
interface Event {
eventId?: string;
aggregateId: string;
aggregateType: string;
eventType: string;
eventData: any;
metadata?: any;
version: number;
createdAt?: Date;
}
class PostgresEventStore {
constructor(private pool: Pool) {}
async appendEvents(
aggregateId: string,
expectedVersion: number,
events: Event[]
): Promise {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Vérification de la version optimiste
const versionCheck = await client.query(
'SELECT COALESCE(MAX(version), 0) as currentversion FROM eventstore WHERE aggregateid = $1',
[aggregateId]
);
const currentVersion = versionCheck.rows[0].currentversion;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
Expected version ${expectedVersion} but found ${currentVersion}
);
}
// Insertion des événements
for (let i = 0; i < events.length; i++) {
const event = events[i];
const version = expectedVersion + i + 1;
await client.query(
`INSERT INTO eventstore
(aggregateid, aggregatetype, eventtype, eventdata, metadata, version)
VALUES ($1, $2, $3, $4, $5, $6)`,
[
aggregateId,
event.aggregateType,
event.eventType,
JSON.stringify(event.eventData),
JSON.stringify(event.metadata || {}),
version
]
);
}
await client.query('COMMIT');
// Publication des événements pour les projections
await this.publishEvents(events);
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(
aggregateId: string,
fromVersion: number = 0
): Promise {
const result = await this.pool.query(
`SELECT FROM eventstore
WHERE aggregateid = $1 AND version > $2
ORDER BY version ASC`,
[aggregateId, fromVersion]
);
return result.rows.map(row => ({
eventId: row.eventid,
aggregateId: row.aggregateid,
aggregateType: row.aggregatetype,
eventType: row.eventtype,
eventData: row.eventdata,
metadata: row.metadata,
version: row.version,
createdAt: row.createdat
}));
}
private async publishEvents(events: Event[]): Promise {
// Publication vers le message broker pour les projections
// Implémentation selon votre broker (Kafka, RabbitMQ, etc.)
}
}
class ConcurrencyError extends Error {
constructor(message: string) {
super(message);
this.name = 'ConcurrencyError';
}
}
Rejouer des milliers d’événements pour reconstruire un agrégat peut devenir coûteux. Les snapshots permettent de sauvegarder périodiquement l’état d’un agrégat, réduisant le nombre d’événements à rejouer.
class SnapshotStore {
constructor(private pool: Pool) {}
async saveSnapshot(
aggregateId: string,
aggregateType: string,
state: any,
version: number
): Promise {
await this.pool.query(
`INSERT INTO snapshots (aggregateid, aggregatetype, state, version, createdat)
VALUES ($1, $2, $3, $4, CURRENTTIMESTAMP)
ON CONFLICT (aggregateid)
DO UPDATE SET state = $3, version = $4, createdat = CURRENTTIMESTAMP`,
[aggregateId, aggregateType, JSON.stringify(state), version]
);
}
async getSnapshot(aggregateId: string): Promise {
const result = await this.pool.query(
'SELECT FROM snapshots WHERE aggregateid = $1',
[aggregateId]
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
aggregateId: row.aggregate_id,
state: row.state,
version: row.version
};
}
}
class AggregateRepository {
constructor(
private eventStore: PostgresEventStore,
private snapshotStore: SnapshotStore,
private snapshotFrequency: number = 100 // Snapshot tous les 100 événements
) {}
async load(aggregateId: string, AggregateClass: any): Promise {
// Tentative de chargement depuis le snapshot
const snapshot = await this.snapshotStore.getSnapshot(aggregateId);
let aggregate: T;
let fromVersion = 0;
if (snapshot) {
aggregate = Object.assign(new AggregateClass(), snapshot.state);
fromVersion = snapshot.version;
} else {
aggregate = new AggregateClass();
}
// Chargement et replay des événements depuis le snapshot
const events = await this.eventStore.getEvents(aggregateId, fromVersion);
for (const event of events) {
(aggregate as any).applyEvent(event);
}
return aggregate;
}
async save(
aggregate: any,
expectedVersion: number,
events: Event[]
): Promise {
await this.eventStore.appendEvents(
aggregate.id,
expectedVersion,
events
);
const newVersion = expectedVersion + events.length;
// Création d'un snapshot si nécessaire
if (newVersion % this.snapshotFrequency === 0) {
await this.snapshotStore.saveSnapshot(
aggregate.id,
aggregate.constructor.name,
aggregate.getState(),
newVersion
);
}
}
}
Le choix entre Kafka et RabbitMQ est l’une des décisions architecturales les plus importantes dans une architecture événementielle. Ces deux technologies résolvent des problèmes différents et excellent dans des contextes distincts.
Apache Kafka est une plateforme de streaming d’événements distribuée conçue pour ingérer et traiter des quantités massives de données. Kafka suit un modèle de log distribué où les messages sont stockés dans une séquence immuable, permettant aux consommateurs de lire depuis n’importe quel point du flux. Cette architecture rend Kafka particulièrement adapté aux architectures événementielles et aux pipelines de données scalables.
RabbitMQ est un broker de messages généraliste qui supporte des patterns de messagerie flexibles, plusieurs protocoles et un routage complexe. Il suit un système de queuing basé sur un broker où les messages sont poussés vers les consommateurs. Son modèle de queuing est idéal pour le traitement de tâches et les patterns request-response, où les messages doivent être traités de manière fiable et dans l’ordre.
Les deux solutions sont hautement disponibles et fault-tolerant, mais Kafka est mieux équipé pour gérer des scénarios à très grande échelle (pétaoctets de données et billions de messages par jour, distribués sur des centaines voire des milliers de brokers). Kafka peut traiter des millions de messages par seconde avec une latence faible grâce à son architecture de log distribué optimisée pour le throughput séquentiel.
RabbitMQ peut également envoyer des millions de messages par seconde, mais nécessite plusieurs brokers pour y parvenir. Typiquement, les performances de RabbitMQ se situent autour de milliers de messages par seconde et peuvent ralentir si les queues RabbitMQ sont congestionnées. RabbitMQ excelle dans les scénarios nécessitant une faible latence, un routage complexe et une communication entre microservices.
Une différence fondamentale réside dans la gestion des messages consommés. RabbitMQ supprime les messages une fois qu’ils sont acquittés par les consommateurs, ne conservant que les messages non confirmés. Cela garantit que le stockage est utilisé efficacement et que seuls les messages actifs restent dans la queue.
Kafka suit un modèle basé sur la rétention, stockant les messages pendant une période prédéfinie indépendamment de leur consommation. Cette différence impacte la gestion des données et l’accès, rendant Kafka utile pour les architectures événementielles et le stockage de données à long terme. Les consommateurs peuvent relire les messages, permettant le replay d’événements et l’analyse historique.
// Producer Kafka pour Event Sourcing
import { Kafka, Producer, CompressionTypes } from 'kafkajs';
class KafkaEventPublisher {
private producer: Producer;
private kafka: Kafka;
constructor(brokers: string[]) {
this.kafka = new Kafka({
clientId: 'event-sourcing-producer',
brokers: brokers,
retry: {
initialRetryTime: 100,
retries: 8
}
});
this.producer = this.kafka.producer({
idempotent: true, // Garantit exactly-once semantics
maxInFlightRequests: 5,
transactionalId: 'event-sourcing-tx'
});
}
async connect(): Promise {
await this.producer.connect();
}
async publishEvents(events: Event[]): Promise {
const transaction = await this.producer.transaction();
try {
const messages = events.map(event => ({
key: event.aggregateId, // Garantit l'ordre par agrégat
value: JSON.stringify({
eventId: event.eventId,
eventType: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
eventData: event.eventData,
metadata: event.metadata,
version: event.version,
timestamp: event.createdAt
}),
headers: {
'event-type': event.eventType,
'aggregate-type': event.aggregateType
}
}));
await transaction.send({
topic: 'domain-events',
messages: messages,
compression: CompressionTypes.GZIP
});
await transaction.commit();
} catch (error) {
await transaction.abort();
throw error;
}
}
async disconnect(): Promise {
await this.producer.disconnect();
}
}
// Consumer Kafka pour projections
class KafkaProjectionConsumer {
private kafka: Kafka;
private consumer: Consumer;
constructor(brokers: string[], groupId: string) {
this.kafka = new Kafka({
clientId: 'projection-consumer',
brokers: brokers
});
this.consumer = this.kafka.consumer({
groupId: groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
maxWaitTimeInMs: 100
});
}
async start(handlers: Map Promise>): Promise {
await this.consumer.connect();
await this.consumer.subscribe({
topic: 'domain-events',
fromBeginning: false
});
await this.consumer.run({
partitionsConsumedConcurrently: 3,
eachMessage: async ({ topic, partition, message }) => {
const eventType = message.headers?.['event-type']?.toString();
const event = JSON.parse(message.value.toString());
const handler = handlers.get(eventType);
if (handler) {
try {
await handler(event);
// Le commit automatique gère l'offset
} catch (error) {
console.error(Error handling event ${eventType}:, error);
// Implémenter retry logic ou dead letter queue
throw error;
}
}
}
});
}
}
// Utilisation
const projectionHandlers = new Map([
['OrderPlaced', async (event) => {
// Mise à jour de la projection de lecture
await updateOrderReadModel(event);
}],
['OrderShipped', async (event) => {
await updateOrderStatus(event.aggregateId, 'SHIPPED');
}]
]);
const consumer = new KafkaProjectionConsumer(
['kafka-broker-1:9092', 'kafka-broker-2:9092'],
'order-projection-group'
);
await consumer.start(projectionHandlers);
// RabbitMQ pour task queues et routing complexe
import amqp, { Connection, Channel } from 'amqplib';
class RabbitMQEventBus {
private connection: Connection;
private channel: Channel;
async connect(url: string): Promise {
this.connection = await amqp.connect(url, {
heartbeat: 60
});
this.channel = await this.connection.createChannel();
await this.channel.prefetch(10); // QoS
// Déclaration des exchanges
await this.channel.assertExchange('domain.events', 'topic', {
durable: true,
autoDelete: false
});
}
async publishEvent(event: Event): Promise {
const routingKey = ${event.aggregateType}.${event.eventType};
const message = JSON.stringify(event);
const published = this.channel.publish(
'domain.events',
routingKey,
Buffer.from(message),
{
persistent: true,
contentType: 'application/json',
timestamp: Date.now(),
messageId: event.eventId,
headers: {
'x-aggregate-id': event.aggregateId,
'x-event-version': event.version
}
}
);
if (!published) {
// Le buffer interne est plein, attendre le drain event
await new Promise(resolve => this.channel.once('drain', resolve));
}
}
async subscribe(
queueName: string,
routingPatterns: string[],
handler: (event: Event) => Promise
): Promise {
// Déclaration de la queue avec options de durabilité
await this.channel.assertQueue(queueName, {
durable: true,
deadLetterExchange: 'domain.events.dlx',
messageTtl: 86400000, // 24 heures
maxLength: 100000
});
// Binding des routing patterns
for (const pattern of routingPatterns) {
await this.channel.bindQueue(queueName, 'domain.events', pattern);
}
// Consommation des messages
await this.channel.consume(queueName, async (msg) => {
if (!msg) return;
try {
const event = JSON.parse(msg.content.toString());
await handler(event);
this.channel.ack(msg);
} catch (error) {
console.error('Error processing message:', error);
// Retry logic avec exponential backoff
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount < 3) {
// Republier avec délai
setTimeout(() => {
this.channel.publish(
'domain.events',
msg.fields.routingKey,
msg.content,
{
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount
}
}
);
this.channel.ack(msg);
}, Math.pow(2, retryCount) 1000);
} else {
// Envoyer vers la dead letter queue
this.channel.nack(msg, false, false);
}
}
});
}
async close(): Promise {
await this.channel.close();
await this.connection.close();
}
}
// Utilisation avec routing avancé
const eventBus = new RabbitMQEventBus();
await eventBus.connect('amqp://localhost');
// Projection spécifique aux commandes Order
await eventBus.subscribe(
'order-projection-queue',
['Order.OrderPlaced', 'Order.OrderShipped', 'Order.OrderCancelled'],
async (event) => {
await updateOrderProjection(event);
}
);
// Service de notification pour tous les événements
await eventBus.subscribe(
'notification-queue',
['#'], // Wildcard pour tous les événements
async (event) => {
await sendNotification(event);
}
);
Les transactions distribuées dans une architecture événementielle nécessitent le Saga pattern. Une saga est une séquence de transactions locales où chaque transaction met à jour une base de données et publie un événement. Si une étape échoue, des transactions compensatoires sont exécutées pour annuler les modifications précédentes.
// Saga orchestrée pour une commande e-commerce
class OrderSaga {
constructor(
private eventStore: PostgresEventStore,
private commandBus: CommandBus
) {}
async handle(event: OrderPlacedEvent): Promise {
const sagaId = event.orderId;
const saga = new SagaInstance(sagaId);
try {
// Étape 1: Réserver l'inventaire
const inventoryReserved = await this.commandBus.send(
new ReserveInventoryCommand(event.items)
);
saga.recordStep('inventory-reserved', inventoryReserved);
// Étape 2: Autoriser le paiement
const paymentAuthorized = await this.commandBus.send(
new AuthorizePaymentCommand(event.customerId, event.totalAmount)
);
saga.recordStep('payment-authorized', paymentAuthorized);
// Étape 3: Confirmer la commande
await this.commandBus.send(
new ConfirmOrderCommand(event.orderId)
);
saga.complete();
} catch (error) {
// Exécution des compensations dans l'ordre inverse
await this.compensate(saga);
throw error;
}
}
private async compensate(saga: SagaInstance): Promise {
const steps = saga.getCompletedSteps().reverse();
for (const step of steps) {
switch (step.name) {
case 'payment-authorized':
await this.commandBus.send(
new CancelPaymentCommand(step.data.paymentId)
);
break;
case 'inventory-reserved':
await this.commandBus.send(
new ReleaseInventoryCommand(step.data.reservationId)
);
break;
}
}
}
}
Une architecture événementielle distribuée nécessite une observabilité approfondie. Les métriques clés à surveiller incluent :
L’architecture événementielle combinant CQRS, Event Sourcing et message queues représente l’état de l’art pour les systèmes distribués modernes. Ces patterns permettent de construire des applications hautement scalables, résilientes et auditables.
Le choix entre Kafka et RabbitMQ dépend de vos besoins spécifiques : Kafka excelle pour le streaming d’événements à grande échelle et le stockage à long terme, tandis que RabbitMQ offre flexibilité et faible latence pour la messagerie transactionnelle. Dans de nombreux cas, une architecture hybride utilisant les deux technologies peut combiner leurs forces pour des solutions d’entreprise complexes.
La complexité introduite par ces patterns nécessite une équipe expérimentée et des outils de monitoring robustes, mais les bénéfices en termes de scalabilité, de résilience et de capacité d’évolution justifient pleinement cet investissement pour les systèmes critiques.
Cet article est vivant — corrections, contre-arguments et retours de production sont les bienvenus. Trois canaux, choisissez celui qui vous convient.