AI-Agenten mit der Page-Rank-Technologie Pregel skalieren

AI-Agenten mit Page-Rank-Technologie skalieren

Von Pregel zu LangGraph – Grundstein für erfolgreiche Softwarearchitektur in KI-Anwendungen

Avatar von Thomas Blank

Die Entwicklung verteilter Graphenverarbeitung zur LLM-Orchestrierung stellt eine faszinierende Anpassung bewährter Patterns an neue Herausforderungen dar. Insbesondere in Multi-Agenten-Architekturen, in denen komplexe und zyklische Workflows, Persistenz, massive Parallelisierung und vollständige Nachvollziehbarkeit gefordert sind, bieten diese Systeme entscheidende Vorteile.

In diesem Beitrag sehen wir uns an, wie die grundlegenden Konzepte von Googles Pregel-System angepasst wurden, um mit LangGraph eine Architektur für die Gestaltung solcher komplexer LLM-Workflows zu schaffen.

Pregel wurde von Google zur Verarbeitung großflächiger Graphen entwickelt. Pregel selbst ist stark vom Bulk Synchronous Parallel (BSP)-Modell inspiriert, dessen Wurzeln in der Veröffentlichung von Leslie Valiant (u. a. mit dem Turing Award und dem Knuth Preis ausgezeichnet) liegen.

Auf das BSP-Modell gehen zahlreiche Implementierungen von parallelen Algorithmen auf verteilten Systemen zurück (Bulk synchronous parallel). Bei Google wurde Pregel etwa zur Implementierung von PageRank oder Borg – dem geistigen Vorgänger zu Kubernetes – genutzt.

Bevor wir uns Pregel im Detail ansehen, ist es wichtig, die grundlegenden Prinzipien zu verstehen, die dessen Design zugrunde liegen. Werfen wir einen kurzen Blick auf die Kernkonzepte des Bulk-Synchronous-Parallel (BSP)-Modells, das die Basis für Pregel bildet und die Anforderungen moderner Multi-Agenten-Architekturen erfüllt:

  1. Supersteps
    • Berechnungen werden in diskrete Phasen – sogenannte Supersteps – unterteilt. In jedem Superstep führt jeder Prozess (oder Knoten) eine Sequenz von Operationen unabhängig aus.
  2. Kommunikation
    • Nach den Berechnungen in einem Superstep können die Prozesse Nachrichten an andere Prozesse senden. Diese Nachrichten werden jedoch erst im nächsten Superstep zugestellt und verarbeitet, was bedeutet, dass alle Kommunikationen in klar definierten Phasen stattfinden.
  3. Synchronisation:
    • Am Ende jedes Supersteps gibt es eine globale Synchronisationsbarriere, die sicherstellt, dass alle Prozesse ihren aktuellen Superstep abgeschlossen haben, bevor der nächste Superstep beginnt. Das verhindert Probleme wie Race Conditions und Deadlocks.

Das Pregel Pattern

Nachdem wir die Prinzipien des BSP-Modells verstanden haben, können wir nun untersuchen, wie Pregel diese nutzt, um die Verarbeitung großer Graphen zu meistern. Pregel führt ein leistungsfähiges „Denke wie ein Knoten“-Muster ein, das besonders gut für die Orchestrierung von LLM-Interaktionen geeignet ist.

Wir implementieren eine grundlegende Version von Googles Pregel-System und entwickeln sie dann weiter, um auch die Kern-Features von LangGraph abzubilden. Durch dieses praktische Experiment gewinnen wir tiefe Einblicke darin, wie sich Konzepte verteilter Systeme in moderne LLM-Orchestrierungsmuster verwandeln. Wobei LangGraph tatsächlich nicht zwingend im Zusammenhang mit Agentic AI, bzw. überhaupt mit LLMs verwendet werden muss.

Wir erhoffen uns allerdings, durch unseren Modellaufbau ein schärferes Verständnis der grundlegenden Muster zu erlangen – um in Folge besser nachzuvollziehen, wann genau LangGraph exakt diejenigen Eigenschaften abbildet, die wir bei der Entwicklung von AI-getriebenen Systemen oder Integrationen benötigen.

Goodies von Mayflower

Keine Sorge – Hilfe ist nah! Melde Dich unverbindlich bei uns und wir schauen uns gemeinsam an, ob und wie wir Dich unterstützen können.

Die Stärke des Pregel-Modells

Um die Kernkonzepte von Pregel besser zu verstehen, werden wir eine einfache Version des Systems implementieren. Dieser praktische Ansatz wird uns helfen, die Funktionsweise des knotenbasierten Modells zu verstehen und uns auf die Erweiterungen durch LangGraph vorzubereiten.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
// Pregel's core vertex-centric model
interface PregelVertex<VValue, EValue, MValue> {
id: VertexId;
value: VValue;
edges: Map<VertexId, EValue>;
active: boolean;
// Core computation function
compute(messages: MValue[]): {
newValue: VValue;
outgoingMessages: Map<VertexId, MValue>;
voteToHalt: boolean;
};
}
// Message aggregation in Pregel
interface MessageCombiner<MValue> {
combine(messages: MValue[]): MValue;
}
class PregelEngine<VValue, EValue, MValue> {
private vertices: Map<VertexId, PregelVertex<VValue, EValue, MValue>>;
private messages: Map<VertexId, MValue[]>;
private combiner?: MessageCombiner<MValue>;
async processSuperstep(): Promise<void> {
// Process all vertices in parallel
await Promise.all(
Array.from(this.vertices.values())
.filter(v => v.active)
.map(async vertex => {
const messages = this.messages.get(vertex.id) || [];
const combined = this.combiner ?
[this.combiner.combine(messages)] : messages;
const result = await vertex.compute(combined);
this.updateVertexState(vertex.id, result);
})
);
}
private updateVertexState(
vertexId: VertexId,
result: ComputationResult<VValue, MValue>
): void {
const vertex = this.vertices.get(vertexId)!;
vertex.value = result.newValue;
vertex.active = !result.voteToHalt;
// Distribute messages for next superstep
result.outgoingMessages.forEach((message, targetId) => {
if (!this.messages.has(targetId)) {
this.messages.set(targetId, []);
}
this.messages.get(targetId)!.push(message);
});
}
}
// Pregel's core vertex-centric model interface PregelVertex<VValue, EValue, MValue> { id: VertexId; value: VValue; edges: Map<VertexId, EValue>; active: boolean; // Core computation function compute(messages: MValue[]): { newValue: VValue; outgoingMessages: Map<VertexId, MValue>; voteToHalt: boolean; }; } // Message aggregation in Pregel interface MessageCombiner<MValue> { combine(messages: MValue[]): MValue; } class PregelEngine<VValue, EValue, MValue> { private vertices: Map<VertexId, PregelVertex<VValue, EValue, MValue>>; private messages: Map<VertexId, MValue[]>; private combiner?: MessageCombiner<MValue>; async processSuperstep(): Promise<void> { // Process all vertices in parallel await Promise.all( Array.from(this.vertices.values()) .filter(v => v.active) .map(async vertex => { const messages = this.messages.get(vertex.id) || []; const combined = this.combiner ? [this.combiner.combine(messages)] : messages; const result = await vertex.compute(combined); this.updateVertexState(vertex.id, result); }) ); } private updateVertexState( vertexId: VertexId, result: ComputationResult<VValue, MValue> ): void { const vertex = this.vertices.get(vertexId)!; vertex.value = result.newValue; vertex.active = !result.voteToHalt; // Distribute messages for next superstep result.outgoingMessages.forEach((message, targetId) => { if (!this.messages.has(targetId)) { this.messages.set(targetId, []); } this.messages.get(targetId)!.push(message); }); } }
// Pregel's core vertex-centric model
interface PregelVertex<VValue, EValue, MValue> {
    id: VertexId;
    value: VValue;
    edges: Map<VertexId, EValue>;
    active: boolean;
    
    // Core computation function
    compute(messages: MValue[]): {
        newValue: VValue;
        outgoingMessages: Map<VertexId, MValue>;
        voteToHalt: boolean;
    };
}

// Message aggregation in Pregel
interface MessageCombiner<MValue> {
    combine(messages: MValue[]): MValue;
}

class PregelEngine<VValue, EValue, MValue> {
    private vertices: Map<VertexId, PregelVertex<VValue, EValue, MValue>>;
    private messages: Map<VertexId, MValue[]>;
    private combiner?: MessageCombiner<MValue>;
    
    async processSuperstep(): Promise<void> {
        // Process all vertices in parallel
        await Promise.all(
            Array.from(this.vertices.values())
                .filter(v => v.active)
                .map(async vertex => {
                    const messages = this.messages.get(vertex.id) || [];
                    const combined = this.combiner ? 
                        [this.combiner.combine(messages)] : messages;
                    
                    const result = await vertex.compute(combined);
                    this.updateVertexState(vertex.id, result);
                })
        );
    }
    
    private updateVertexState(
        vertexId: VertexId, 
        result: ComputationResult<VValue, MValue>
    ): void {
        const vertex = this.vertices.get(vertexId)!;
        vertex.value = result.newValue;
        vertex.active = !result.voteToHalt;
        
        // Distribute messages for next superstep
        result.outgoingMessages.forEach((message, targetId) => {
            if (!this.messages.has(targetId)) {
                this.messages.set(targetId, []);
            }
            this.messages.get(targetId)!.push(message);
        });
    }
}

Diese Grundlage führt mehrere Schlüsselkonzepte ein, die für das Verständnis von LangGraph entscheidend sein werden:

  1. Zustandsverwaltung: Jeder Knoten verwaltet seinen eigenen Zustand (State) und kann ihn basierend auf Berechnungsergebnissen aktualisieren. Das spiegelt wider, wie LangGraph-Knoten den Gesprächszustand und Kontext verwalten.
  2. Nachrichtenaustausch: Knoten kommunizieren durch Nachrichten, wodurch Informationen durch den Graphen fließen können. Dies entwickelt sich zu LangGraphs Zustandsübergängen und Kontextaktualisierungen.
  3. Synchronisation: Das Superstep-Modell gewährleistet eine koordinierte Verarbeitung, die LangGraph für sequentielle LLM-Interaktionen adaptiert.

LangGraphs Adaption

Von Knotenwerten zu reichhaltigem Zustand

Trotz seiner Stärken hat Pregel Einschränkungen, insbesondere in Bezug auf Flexibilität und Anpassungsfähigkeit an dynamische Workflows. Hier kommt LangGraph ins Spiel, das Pregels Fähigkeiten erweitert, um den Anforderungen der modernen LLM-Orchestrierung gerecht zu werden. Versuchen wir doch, auch diese in unserem Code abzubilden:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
interface LangGraphState {
messages: Array<{
role: 'user' | 'assistant' | 'system';
content: string;
timestamp: Date;
}>;
context: {
workflow: {
currentNode: string;
previousNodes: string[];
startTime: Date;
stepCount: number;
};
business: Record<string, any>;
memory: Record<string, any>;
};
metadata: {
processingStats: {
nodeExecutions: Record<string, number>;
totalTokensUsed: number;
totalProcessingTime: number;
};
lastError?: {
nodeId: string;
error: Error;
timestamp: Date;
};
};
}
interface LangNode<State> {
id: string;
process: (state: State) => Promise<NodeResult<State>>;
errorHandler?: ErrorHandler<State>;
}
interface NodeResult<State> {
newState: State;
transition?: string;
metadata: {
processingTime: number;
llmTokensUsed?: number;
toolCalls?: ToolCallMetadata[];
};
}
class LangGraphEngine<State> {
private nodes: Map<string, LangNode<State>>;
private transitionRules: Map<string, TransitionRule<State>>;
private stateManager: StateManager<State>;
async executeWorkflow(
initialState: State,
startNodeId: string
): Promise<State> {
let currentNodeId = startNodeId;
let currentState = initialState;
while (currentNodeId) {
const node = this.nodes.get(currentNodeId)!;
try {
// Process node with state tracking
const start = performance.now();
const result = await node.process(currentState);
const duration = performance.now() - start;
// Update state with processing metadata
currentState = this.stateManager.updateState(
result.newState,
{
nodeId: currentNodeId,
duration,
metadata: result.metadata
}
);
// Determine next node using transition rules
currentNodeId = await this.determineNextNode(
currentNodeId,
currentState
);
} catch (error) {
if (node.errorHandler) {
currentState = await node.errorHandler(error, currentState);
currentNodeId = await this.determineNextNode(
currentNodeId,
currentState,
true
);
} else {
throw error;
}
}
}
return currentState;
}
private async determineNextNode(
currentNodeId: string,
state: State,
isErrorPath: boolean = false
): Promise<string | null> {
const rules = this.transitionRules.get(currentNodeId);
if (!rules) return null;
if (isErrorPath && rules.onError) {
return rules.onError;
}
for (const [targetNode, condition] of rules.transitions) {
if (await condition(state)) {
return targetNode;
}
}
return null;
}
}
interface LangGraphState { messages: Array<{ role: 'user' | 'assistant' | 'system'; content: string; timestamp: Date; }>; context: { workflow: { currentNode: string; previousNodes: string[]; startTime: Date; stepCount: number; }; business: Record<string, any>; memory: Record<string, any>; }; metadata: { processingStats: { nodeExecutions: Record<string, number>; totalTokensUsed: number; totalProcessingTime: number; }; lastError?: { nodeId: string; error: Error; timestamp: Date; }; }; } interface LangNode<State> { id: string; process: (state: State) => Promise<NodeResult<State>>; errorHandler?: ErrorHandler<State>; } interface NodeResult<State> { newState: State; transition?: string; metadata: { processingTime: number; llmTokensUsed?: number; toolCalls?: ToolCallMetadata[]; }; } class LangGraphEngine<State> { private nodes: Map<string, LangNode<State>>; private transitionRules: Map<string, TransitionRule<State>>; private stateManager: StateManager<State>; async executeWorkflow( initialState: State, startNodeId: string ): Promise<State> { let currentNodeId = startNodeId; let currentState = initialState; while (currentNodeId) { const node = this.nodes.get(currentNodeId)!; try { // Process node with state tracking const start = performance.now(); const result = await node.process(currentState); const duration = performance.now() - start; // Update state with processing metadata currentState = this.stateManager.updateState( result.newState, { nodeId: currentNodeId, duration, metadata: result.metadata } ); // Determine next node using transition rules currentNodeId = await this.determineNextNode( currentNodeId, currentState ); } catch (error) { if (node.errorHandler) { currentState = await node.errorHandler(error, currentState); currentNodeId = await this.determineNextNode( currentNodeId, currentState, true ); } else { throw error; } } } return currentState; } private async determineNextNode( currentNodeId: string, state: State, isErrorPath: boolean = false ): Promise<string | null> { const rules = this.transitionRules.get(currentNodeId); if (!rules) return null; if (isErrorPath && rules.onError) { return rules.onError; } for (const [targetNode, condition] of rules.transitions) { if (await condition(state)) { return targetNode; } } return null; } }
interface LangGraphState {
    messages: Array<{
        role: 'user' | 'assistant' | 'system';
        content: string;
        timestamp: Date;
    }>;
    context: {
        workflow: {
            currentNode: string;
            previousNodes: string[];
            startTime: Date;
            stepCount: number;
        };
        business: Record<string, any>;
        memory: Record<string, any>;
    };
    metadata: {
        processingStats: {
            nodeExecutions: Record<string, number>;
            totalTokensUsed: number;
            totalProcessingTime: number;
        };
        lastError?: {
            nodeId: string;
            error: Error;
            timestamp: Date;
        };
    };
}

interface LangNode<State> {
    id: string;
    process: (state: State) => Promise<NodeResult<State>>;
    errorHandler?: ErrorHandler<State>;
}

interface NodeResult<State> {
    newState: State;
    transition?: string;
    metadata: {
        processingTime: number;
        llmTokensUsed?: number;
        toolCalls?: ToolCallMetadata[];
    };
}

class LangGraphEngine<State> {
    private nodes: Map<string, LangNode<State>>;
    private transitionRules: Map<string, TransitionRule<State>>;
    private stateManager: StateManager<State>;
    
    async executeWorkflow(
        initialState: State,
        startNodeId: string
    ): Promise<State> {
        let currentNodeId = startNodeId;
        let currentState = initialState;
        
        while (currentNodeId) {
            const node = this.nodes.get(currentNodeId)!;
            
            try {
                // Process node with state tracking
                const start = performance.now();
                const result = await node.process(currentState);
                const duration = performance.now() - start;
                
                // Update state with processing metadata
                currentState = this.stateManager.updateState(
                    result.newState,
                    {
                        nodeId: currentNodeId,
                        duration,
                        metadata: result.metadata
                    }
                );
                
                // Determine next node using transition rules
                currentNodeId = await this.determineNextNode(
                    currentNodeId,
                    currentState
                );
                
            } catch (error) {
                if (node.errorHandler) {
                    currentState = await node.errorHandler(error, currentState);
                    currentNodeId = await this.determineNextNode(
                        currentNodeId,
                        currentState,
                        true
                    );
                } else {
                    throw error;
                }
            }
        }
        
        return currentState;
    }
    
    private async determineNextNode(
        currentNodeId: string,
        state: State,
        isErrorPath: boolean = false
    ): Promise<string | null> {
        const rules = this.transitionRules.get(currentNodeId);
        if (!rules) return null;
        
        if (isErrorPath && rules.onError) {
            return rules.onError;
        }
        
        for (const [targetNode, condition] of rules.transitions) {
            if (await condition(state)) {
                return targetNode;
            }
        }
        
        return null;
    }
}

Wir beobachten hier folgende Anpassungen an das Pregel-Modell:

  • Pregels einfacher Knotenwert wird zu LangGraphs reichhaltigem Zustandsobjekt, was wir – ähnlich wie beim Redux-Pattern – bei allen einzelnen Operationen zu Rate ziehen. Und das – ähnlich wie bei einem Event Sourcing Pattern – mit Metadaten versehen werden kann, um den Ausführungsfortschritt und Status zu verfolgen.
  • Nachrichtenverlauf und Kontext sind „first-class“-Bestandteile des Zustandsobjekts.
  • Metadaten verfolgen Ausführungsfortschritt und Status ähnlich wie bei einem Event Sourcing Pattern.

Von Nachrichtenübermittlung zu Zustandsaktualisierungen

Während Pregel Nachrichtenübermittlung zur Koordination nutzt, verwaltet LangGraph ein umfangreiches Zustandsobjekt:

  • Pregels expliziter Nachrichtenaustausch wird zu Zustandsübergängen, ähnlich wie bei einer Finite State Machine (FSM).
  • Nachrichten sind somit Teil des Zustands selbst.
  • Übergänge werden nun durch Zustandsbedingungen statt einzig durch direkte Nachrichten bestimmt.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
interface LangGraphState {
messages: Array<{
role: 'user' | 'assistant' | 'system';
content: string;
timestamp: Date;
}>;
context: {
workflow: {
currentNode: string;
previousNodes: string[];
startTime: Date;
stepCount: number;
};
business: Record<string, any>;
memory: Record<string, any>;
};
metadata: {
processingStats: {
nodeExecutions: Record<string, number>;
totalTokensUsed: number;
totalProcessingTime: number;
};
lastError?: {
nodeId: string;
error: Error;
timestamp: Date;
};
};
}
class StateManager<State> {
private stateHistory: Array<{
state: State;
timestamp: Date;
nodeId: string;
}> = [];
updateState(
newState: State,
metadata: StateUpdateMetadata
): State {
// Create immutable state update
const updatedState = {
...newState,
context: {
...newState.context,
workflow: {
...newState.context.workflow,
stepCount: newState.context.workflow.stepCount + 1,
}
},
metadata: {
...newState.metadata,
processingStats: {
...newState.metadata.processingStats,
totalProcessingTime:
newState.metadata.processingStats.totalProcessingTime +
metadata.duration
}
}
};
// Record state history
this.stateHistory.push({
state: updatedState,
timestamp: new Date(),
nodeId: metadata.nodeId
});
return updatedState;
}
async persistState(state: State): Promise<void> {
// Implement state persistence logic
}
getStateHistory(
startTime?: Date,
endTime?: Date
): Array<{state: State; timestamp: Date; nodeId: string}> {
return this.stateHistory.filter(entry =>
(!startTime || entry.timestamp >= startTime) &&
(!endTime || entry.timestamp <= endTime)
);
}
}
interface LangGraphState { messages: Array<{ role: 'user' | 'assistant' | 'system'; content: string; timestamp: Date; }>; context: { workflow: { currentNode: string; previousNodes: string[]; startTime: Date; stepCount: number; }; business: Record<string, any>; memory: Record<string, any>; }; metadata: { processingStats: { nodeExecutions: Record<string, number>; totalTokensUsed: number; totalProcessingTime: number; }; lastError?: { nodeId: string; error: Error; timestamp: Date; }; }; } class StateManager<State> { private stateHistory: Array<{ state: State; timestamp: Date; nodeId: string; }> = []; updateState( newState: State, metadata: StateUpdateMetadata ): State { // Create immutable state update const updatedState = { ...newState, context: { ...newState.context, workflow: { ...newState.context.workflow, stepCount: newState.context.workflow.stepCount + 1, } }, metadata: { ...newState.metadata, processingStats: { ...newState.metadata.processingStats, totalProcessingTime: newState.metadata.processingStats.totalProcessingTime + metadata.duration } } }; // Record state history this.stateHistory.push({ state: updatedState, timestamp: new Date(), nodeId: metadata.nodeId }); return updatedState; } async persistState(state: State): Promise<void> { // Implement state persistence logic } getStateHistory( startTime?: Date, endTime?: Date ): Array<{state: State; timestamp: Date; nodeId: string}> { return this.stateHistory.filter(entry => (!startTime || entry.timestamp >= startTime) && (!endTime || entry.timestamp <= endTime) ); } }
interface LangGraphState {
    messages: Array<{
        role: 'user' | 'assistant' | 'system';
        content: string;
        timestamp: Date;
    }>;
    context: {
        workflow: {
            currentNode: string;
            previousNodes: string[];
            startTime: Date;
            stepCount: number;
        };
        business: Record<string, any>;
        memory: Record<string, any>;
    };
    metadata: {
        processingStats: {
            nodeExecutions: Record<string, number>;
            totalTokensUsed: number;
            totalProcessingTime: number;
        };
        lastError?: {
            nodeId: string;
            error: Error;
            timestamp: Date;
        };
    };
}

class StateManager<State> {
    private stateHistory: Array<{
        state: State;
        timestamp: Date;
        nodeId: string;
    }> = [];
    
    updateState(
        newState: State,
        metadata: StateUpdateMetadata
    ): State {
        // Create immutable state update
        const updatedState = {
            ...newState,
            context: {
                ...newState.context,
                workflow: {
                    ...newState.context.workflow,
                    stepCount: newState.context.workflow.stepCount + 1,
                }
            },
            metadata: {
                ...newState.metadata,
                processingStats: {
                    ...newState.metadata.processingStats,
                    totalProcessingTime: 
                        newState.metadata.processingStats.totalProcessingTime +
                        metadata.duration
                }
            }
        };
        
        // Record state history
        this.stateHistory.push({
            state: updatedState,
            timestamp: new Date(),
            nodeId: metadata.nodeId
        });
        
        return updatedState;
    }
    
    async persistState(state: State): Promise<void> {
        // Implement state persistence logic
    }
    
    getStateHistory(
        startTime?: Date,
        endTime?: Date
    ): Array<{state: State; timestamp: Date; nodeId: string}> {
        return this.stateHistory.filter(entry => 
            (!startTime || entry.timestamp >= startTime) &&
            (!endTime || entry.timestamp <= endTime)
        );
    }
}

Von Supersteps zur sequentiellen Verarbeitung

  • Pregels synchronisierte Supersteps werden zu sequentiellen LLM-Interaktionen. Somit wird eine vorhersehbare Abfolge von Operationen gewährleistet.
  • Zustandsaktualisierungen erfolgen atomar nach jedem Verarbeitungsschritt, was eine klare Trennung zwischen den einzelnen Schritten garantiert. Abhängigkeiten zwischen den Schritten werden somit expliziter und können simpler aufgelöst werden als in einem asynchronem System mit Nachrichtenaustausch zwischen den Knoten des Graphen. Dies erlaubt uns auch Fehler besser zu behandeln und auf Unvorhergesehenes zu reagieren. Wir wissen, welcher Schritt ggf. fehlgeschlagen ist, welcher Zustand vorher herrschte (und welcher danach), wenn der Fehler auftrat. Zusammen mit den Messages haben wir damit also alles parat, um den Verlauf nachzuvollziehen. Somit sind bspw. automatische Wiederholungen oder Schleifen schnell trivial abzubilden.
  • Der Ausführungsablauf ist deterministischer und nachvollziehbarer, richtet sich eben an explizit festgelegte „Regeln“ bzw. Bedingungen, die wir an unserem State (inkl. der Messages!) stellen. Somit könnten wir beispielsweise Anpassungen an einzelnen Knoten oder an einzelnen “Regeln” direkt Testen, ohne alle anderen Abläufe im Vorfeld immer neu auszuführen – alles was wir dafür bräuchten ist ein Beispiel-State, von dem aus wir dann unsere Anpassungen schlicht in den nächsten “Superstep” schickten. Super, wenn wir also mit TDD arbeiten wollen! Abgesehen von betrieblichen Kosten bei der Integration von LLMs (Token-Usage, Server-Setup, etc.) würde uns somit auch eine Menge unnötiger Wartezeit erspart.
  • Pause und Wiederaufnahme von Workflows sind möglich, ohne dass der gesamte Workflow neu gestartet werden muss oder dass der Zustand verloren geht oder ungültig wird. Nicht nur in Fehlerfällen können wir das Wissen des aktuellen States benutzen – auch solche spannenden Features wie „Human-in-the-Loop“ (oder anderer Integrationen mit „Drittsystemen“) ließen sich darauf aufbauen. Dazu können wir uns später noch mal genauer ein paar Beispiele ausdenken und erproben.

Zusammenfassung

Nachdem wir die Anpassungen untersucht haben, die LangGraph einführt, fassen wir den Weg von Pregel zu LangGraph zusammen. Durch den Aufbau einer einfachen Pregel-Implementierung und deren Erweiterung konnten wir sehen, wie diese architektonischen Muster komplexe LLM-Workflows effektiv orchestrieren.

Verarbeitungsmodell

  • Pregel: Parallele Vertex-Verarbeitung mit synchronisierten “Supersteps”
  • LangGraph: Sequentielle Knotenausführung mit zustandsgesteuerter Koordination

Zustandsverwaltung

  • Pregel: Verteilter Zustand mit Nachrichtenübermittlung
  • LangGraph: Zentralisierter Zustand um reichhaltigem Kontext und Historie erweitert

Fehlerbehandlung

  • Pregel: Grundlegende Fehlertoleranz durch Checkpointing und gewisser Autonomie einzelner Knoten
  • LangGraph: Ausgefeilte Fehlerbehebung mit Historien, Metadaten, und daraus ableitbaren individuellen Schritten (bspw. Zustandsrücksetzung, automatischer Retry, etc.).

Oder, wie LangGraph es in seiner eigenen Dokumentation erklärt:

LangGraph’s underlying graph algorithm uses message passing to define a general program. When a Node completes its operation, it sends messages along one or more edges to other node(s). These recipient nodes then execute their functions, pass the resulting messages to the next set of nodes, and the process continues. Inspired by Google’s Pregel system, the program proceeds in discrete „super-steps.“

Fazit

Es lässt sich also festhalten, dass LangGraphs Architektur eine durchdachte Weiterentwicklung der Pregel-Konzepte darstellt, die auf die einzigartigen Herausforderungen der LLM-Integration – insbesondere in der Entwicklung von agentischen AI-Systemen – zugeschnitten ist. Durch das Verständnis dieser grundlegenden Muster streben wir danach, robuste und wartbare LLM-Anwendungen zu erstellen, die das Beste aus beiden Welten vereinen.

Ein nächster Schritt wäre es, fortgeschrittene State-Management-Muster zu untersuchen, einschließlich Persistenz-Strategien, Builder-Pattern, Tests, und Techniken zur Handhabung komplexer Workflows wie etwa dem “Human-in-the-Loop” (HITL).

Goodies von Mayflower

Keine Sorge – Hilfe ist nah! Melde Dich unverbindlich bei uns und wir schauen uns gemeinsam an, ob und wie wir Dich unterstützen können.

Geld verdienen mit AI

Avatar von Thomas Blank

Kommentare

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Für das Handling unseres Newsletters nutzen wir den Dienst HubSpot. Mehr Informationen, insbesondere auch zu Deinem Widerrufsrecht, kannst Du jederzeit unserer Datenschutzerklärung entnehmen.