+49 721 9654-724
Blog

Tutorial: Eine natürlichsprachige Schnittstelle für das IIoT mit MCP und Apache StreamPipes

Thumbnail Image

Einführung

Das neue Model Context Protocol (MCP) ermöglicht es LLM-Anwendungen, lokale oder entfernte „Server“ aufzurufen, die Tools und Ressourcen in einer strukturierten, JSON-Schema-gesteuerten Weise bereitstellen.

In diesem Blogbeitrag bauen wir einen einfachen MCP-Server, der den Apache StreamPipes Java-Client verwendet und einige seiner Funktionen für Claude Desktop (und n8n) bereitstellt. Dieser MCP-Server ermöglicht z.B. Chat-Anwendungen wie Claude Desktop die Abfrage von StreamPipes nach verfügbaren Adaptern, das Starten von Pipelines und vieles mehr, alles über Befehle in natürlicher Sprache.

In diesem Tutorial werden wir:

  • Einen Spring Boot MCP-Server aufbauen, der den StreamPipes Java-Client verwendet und Tools für verschiedene StreamPipes-Operationen bereitstellt.
  • Den Server in ein lauffähiges JAR verpacken.
  • Den MCP-Server in der claude_desktop_config.json-Datei registrieren
  • Den MCP-Server so konfigurieren, dass er über HTTP oder Stdio angesprochen werden kann.

Model Context Protocol (MCP)

MCP ist ein Protokoll, das entwickelt wurde, um großen Sprachmodellen (LLMs) die strukturierte Interaktion mit externen Systemen und Diensten zu ermöglichen. Es erlaubt LLMs, Funktionen aufzurufen, Daten abzurufen und Aktionen auszuführen, indem eine Reihe von Tools definiert wird, die mit Befehlen in natürlicher Sprache aufgerufen werden können.

MCP (eingeführt von Anthropic Ende 2024) definiert ein sehr kompaktes Nachrichtenformat – über stdin/stdout, Sockets oder HTTP –, sodass jeder LLM-Agent Tools entdecken, Parameter validieren und sie mit Zustimmung des Nutzers ausführen kann.

Warum StreamPipes und MCP?

Apache StreamPipes ist unser Open-Source-Framework zur Verarbeitung und Analyse von Streaming-Daten aus industriellen Datenquellen wie SPS, Sensoren und IoT-Geräten. Es bietet eine benutzerfreundliche Oberfläche zum Erstellen von Datenpipelines, zur Integration verschiedener Datenquellen und zur Anwendung von Analysen in Echtzeit.

Im Hintergrund bietet StreamPipes viele APIs zur Interaktion mit industriellen Datenquellen, Pipelines für Verarbeitung und Analytik. Darüber hinaus beinhaltet StreamPipes mehrere Client-Bibliotheken für verschiedene Programmiersprachen, einschließlich eines Java-Clients, der es Entwicklern ermöglicht, programmgesteuert mit StreamPipes-Instanzen zu interagieren.

In diesem Experiment erstellen wir einen einfachen MCP-Server, der den StreamPipes Java-Client verwendet und dessen Funktionen für Claude Desktop oder andere MCP-Clients bereitstellt. Durch die Verwendung von MCP können wir die vorhandenen Plattformfunktionen von StreamPipes nutzen und gleichzeitig eine Schnittstelle in natürlicher Sprache bereitstellen, mit der Benutzer mit StreamPipes-Ressourcen interagieren können.

Mögliche Fragen könnten z.B. sein:

  • "Zeige alle Pipelines an, die derzeit in StreamPipes ausgeführt werden."
  • "Starte die Pipeline zur Überwachung der Durchflussrate."
  • "Welche Adapter sind in meiner StreamPipes-Instanz verfügbar?"
  • "Erstelle ein Diagramm, das Durchflussrate und Temperatur aus der Pipeline zur Überwachung der Durchflussrate vergleicht."

Viele weitere Befehle sind möglich, und der MCP-Server kann bei Bedarf erweitert werden, um zusätzliche Funktionen zu unterstützen. In diesem Blogbeitrag konzentrieren wir uns jedoch auf einige wichtige Befehle, um das Konzept zu demonstrieren.

Voraussetzungen

  • Java (17 oder höher) und Maven lokal installiert
  • Eine laufende Apache StreamPipes-Instanz (z.B. die [StreamPipes-Docker-Images]https://streampipes.apache.org/download/) für die lokale Entwicklung verwenden)
  • Claude Desktop oder ein anderes System, das MCP unterstützt (z.B. n8n)

Projektsetup

Wir gehen davon aus, dass StreamPipes bereits lokal oder auf einem Remote-Server läuft. Die StreamPipes-Dokumentation enthält eine Anleitung für den erstmaligen Start.

In diesem Beispiel erstellen wir zunächst eine eigenständige Spring Boot-Anwendung, die als MCP-Server dient. Die Anwendung kommuniziert mit der StreamPipes-Instanz über den StreamPipes Java-Client.

Erstelle ein neues Maven-Projekt mit der folgenden Struktur:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ai.bytefabrik</groupId>
    <artifactId>streampipes-mcp-server-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <streampipes.version>0.98.0-SNAPSHOT</streampipes.version>
        <spring-ai.version>1.0.0</spring-ai.version>
        <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.streampipes</groupId>
            <artifactId>streampipes-client</artifactId>
            <version>${streampipes.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>commons-logging</groupId>
                    <artifactId>commons-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.ai</groupId>
            <artifactId>spring-ai-starter-mcp-server-webmvc</artifactId>
            <version>${spring-ai.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.micrometer</groupId>
                    <artifactId>micrometer-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <repositories>
      <repository>
        <id>apache-snapshots</id>
        <url>https://repository.apache.org/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
          <updatePolicy>always</updatePolicy>
        </snapshots>
      </repository>
    </repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                        <configuration>
                            <mainClass>ai.bytefabrik.streampipes.mcp.StreamPipesMcpExampleApplication</mainClass>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <finalName>streampipes-mcp-server</finalName>
    </build>
</project>

Wir binden zwei Abhängigkeiten ein: streampipes-client für den StreamPipes Java-Client, spring-ai-starter-mcp-server-webmvc für die MCP-Serverfunktionalität. Falls der MCP-Server nur im Stdio-Modus verwenden soll, kann stattdessen spring-ai-starter-mcp-server verwendet werden.

Zusätzlich haben wir das Apache StreamPipes-Snapshot-Repository aufgenommen, um sicherzustellen, dass wir die neueste Version des StreamPipes-Clients verwenden, der z.B. die Unterstützung zum Abrufen von Adapterinformationen bietet.

Code

Hauptanwendungsklasse

Erstellen wir nun die Hauptanwendungsklasse, die als Einstiegspunkt für unseren MCP-Server dient.

package ai.bytefabrik.streampipes.mcp;

import ai.bytefabrik.streampipes.mcp.tool.AdapterTool;
import ai.bytefabrik.streampipes.mcp.tool.PipelineTool;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.client.StreamPipesCredentials;
import org.apache.streampipes.client.api.IStreamPipesClient;
import org.springframework.ai.support.ToolCallbacks;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.util.List;
import java.util.Optional;

@SpringBootApplication
public class StreamPipesMcpExampleApplication {

  public static void main(String[] args) {
    SpringApplication.run(StreamPipesMcpExampleApplication.class, args);
  }

  @Bean
  public List<ToolCallback> toolCallbacks() {
    var client = makeClient();
    return List.of(
        ToolCallbacks.from(new PipelineTool(client)),
        ToolCallbacks.from(new AdapterTool(client))
    );
  }

  private IStreamPipesClient makeClient() {
    var host     = System.getenv("STREAMPIPES_HOST");
    var port     = Integer.parseInt(System.getenv("STREAMPIPES_PORT"));
    var username = System.getenv("STREAMPIPES_USERNAME");
    var apiKey   = System.getenv("STREAMPIPES_API_KEY");
    var httpsOff = Boolean.parseBoolean(
        Optional.ofNullable(System.getenv("STREAMPIPES_HTTPS_DISABLED"))
                .orElse("false"));
    return StreamPipesClient.create(
        host, port,
        StreamPipesCredentials.withApiKey(username, apiKey),
        httpsOff);
  }
}

In dieser Klasse definieren wir zwei Tools: PipelineTool und AdapterTool, die die Operationen im Zusammenhang mit StreamPipes-Pipelines bzw. -Adaptern behandeln. Darüber hinaus erstellen wir einen StreamPipes-Client mit der Methode makeClient(), der die erforderliche Konfiguration aus Umgebungsvariablen abruft.

Tool-Klassen

Als Nächstes müssen wir die Klassen PipelineTool und AdapterTool implementieren, die die vom MCP-Server bereitgestellten Funktionen definieren.

Spring AI erleichtert die Definition von Tools, indem es eine @Tool-Annotation bereitstellt, mit der wir den Namen, die Beschreibung sowie die Eingabe-/Ausgabeparameter des Tools angeben können.

Die Klasse PipelineTool sieht dann wie folgt aus:

public class PipelineTool {

  private final IStreamPipesClient client;

  public PipelineTool(IStreamPipesClient client) {
    this.client = client;
  }

  @Tool(
      name = "get_all_pipelines",
      description = "Get all available StreamPipes pipelines"
  )
  public List<Pipeline> getPipelines() {
    return client.pipelines().all();
  }

  @Tool(
      name = "get_pipeline_by_id",
      description = "Given the ID of a pipeline, return a detailed description of the pipeline"
  )
  public Pipeline getPipelineById(String id) {
    return client.pipelines()
        .get(id)
        .orElseThrow(() -> new IllegalArgumentException(String.format("Could not find pipeline with id %s", id)));
  }

  @Tool(
      name = "start_pipeline",
      description = "Start the pipeline with the given ID"
  )
  public void startPipeline(String id) {
    var pipeline = getPipelineById(id);
    client.pipelines().start(pipeline);
  }
}

Da der StreamPipes-Client bereits Methoden zur Interaktion mit Pipelines bereitstellt, können wir diese Methoden direkt in unserer Tool-Implementierung verwenden. In diesem Beispiel definieren wir drei Tools, um alle vorhandenen Pipelines abzurufen, eine Pipeline anhand ihrer ID abzurufen und eine Pipeline anhand ihrer ID zu starten. Im GitHub-Repository sind weitere Methoden definiert, um Pipelines zu stoppen und mit Adaptern zu interagieren.

application.yaml

Als Nächstes müssen wir unseren MCP-Server konfigurieren, indem wir eine Datei application.yaml im Verzeichnis src/main/resources erstellen:

logging:
  pattern:
    console:
spring:
  main:
    banner-mode: off
application:
  name: streampipes-mcp-example
ai:
  mcp:
    server:
      name: bytefabrik-streampipes-mcp-server
      version: 0.1.0
      enabled: true
      stdio: true

Standardmäßig führt Spring AI den MCP-Server im Stdio-Modus aus, was bedeutet, dass Befehle von der Standardeingabe gelesen und Antworten an die Standardausgabe geschrieben werden. Stdio erfordert, dass das Protokoll und der Banner-Modus auf leer und aus, bzw. deaktiviert gesetzt sind.

Durch das Einbinden der Abhängigkeit spring-ai-starter-mcp-server-webmvc erhalten wir automatisch alle Funktionen des MCP-Servers. Das bedeutet, sobald wir die Anwendung ausführen, beginnt sie, auf MCP-Befehle über die Standard-Eingabe zu lauschen, startet aber auch einen Webserver, der verwendet werden kann, um über HTTP und SSE mit dem MCP-Server zu interagieren.

Erstellen des MCP-Servers

Um den MCP-Server zu erstellen, verwenden wir Maven, um die Anwendung in eine ausführbare JAR-Datei zu überführen. Durch das Ausführen von mvn clean package wird eine JAR-Datei im Verzeichnis target mit dem Namen streampipes-mcp-server.jar erstellt.

Generieren eines API-Schlüssels

Um einen API-Schlüssel für die StreamPipes-Instanz zu generieren, kann die StreamPipes-Benutzeroberfläche oder die Bytefabrik-Plattform-Benutzeroberfläche verwendet werden. Navigiere zu "Profil -> Einstellungen -> API-Schlüssel" und erstellen Sie einen neuen API-Schlüssel.

Kopiere den API-Schlüssel - wir benötigen ihn im nächsten Schritt!

Verwendung des MCP-Servers mit Claude Desktop

Jetzt müssen wir den MCP-Server noch mit Claude Desktop über Stdio verbinden.

Öffne in Claude Desktop die Einstellungenund navigiere zu Entwickler. Beim Klicken auf Konfiguration bearbeiten kann die folgende Konfiguration zur Datei claude_desktop_config.json hinzugefügt werden:

{
  "mcpServers": {
    "bytefabrik-streampipes-mcp-server": {
      "env": {
        "STREAMPIPES_HOST": "localhost",
        "STREAMPIPES_PORT": "8082",
        "STREAMPIPES_USERNAME": "admin@streampipes.apache.org",
        "STREAMPIPES_API_KEY": "your_api_key_here",
        "STREAMPIPES_HTTPS_DISABLED": "true"
      },
      "command": "java",
      "args": [
        "-jar",
        "path_to/streampipes-mcp-server.jar"
      ]
    }
  }
}

Kopiere den API-Schlüssel aus dem vorherigen Schritt und ersetze your_api_key_here durch den generierten Schlüssel. Außerdem müssen ggf. Host, Port und Benutzernamen angepasst werden.

Starte nun Claude Desktop neu, um die Änderungen zu übernehmen, und stelle dabei sicher, dass beim Start keine Fehler angezeigt werden.

Jetzt können wir den Server verwenden!

Navigiere zum Chatfenster und versuche Folgendes:

List all pipelines that are currently running in StreamPipes.

Claude antwortet mit einer Beschreibung aller Pipelines, die derzeit in der StreamPipes-Instanz ausgeführt werden.

List pipelines in Claude

Wenn der MCP-Server zum ersten Mal verwendet wird, fragt Claude nach der Erlaubnis für den Zugriff auf die StreamPipes-Tools:

Allow Claude to access the StreamPipes tools

Jetzt können wir z.B. Claude auffordern, eine bestimmte Pipeline zu erklären:

Explain a pipeline in Claude

Wir sehen eine ausführliche Beschreibung der Pipeline, einschließlich der Datenquellen, der Konfiguration aller Prozessoren und Senken sowie des aktuellen Status.

Was ist mit Aktionen? Wir können eine Pipeline aus Claude heraus stoppen, indem wir einfach den Namen der Pipeline angeben:

Stop a pipeline in Claude

Claude sucht nun automatisch die Pipeline-ID und stoppt die Pipeline.

Es gibt noch viele weitere Dinge zum ausprobieren - z.B. Aktionen auf Adaptern ausführen.

Verwendung des MCP-Servers mit n8n

Bisher haben wir den Stdio-Modus verwendet, um mit dem MCP-Server zu interagieren. Sehen wir uns nun an, wie wir den MCP-Server in n8n, einem beliebten Open-Source-Tool zur Automatisierung von Workflows, verwenden können.

Zuerst müssen wir den MCP-Server starten, den wir zuvor erstellt haben. Führe nun den folgenden Befehl aus:

java -jar target/streampipes-mcp-server.jar

Standardmäßig hört der MCP-Server auf Port 8080.

Um den MCP-Server mit n8n zu integrieren, müssen wir n8n so konfigurieren, dass es den StreamPipes MCP-Server verwendet.

Füge ein neues "MCP Client Tool" als Tool hinzu. Konfiguriere dann das MCP-Client-Tool mit der IP und dem Port des StreamPipes MCP-Servers, z.B. http://localhost:8080.

n8n StreamPipes MCP config

Jetzt können wir den StreamPipes MCP-Server in n8n verwenden, um mit unserer StreamPipes-Instanz zu interagieren!

Using the StreamPipes MCP server in n8n

Ausblick

Das Model Context Protocol (MCP) ist eine leistungsstarke Möglichkeit, die Fähigkeiten von LLMs zu erweitern, indem ihnen die Interaktion mit externen Systemen und Diensten ermöglicht wird.

In diesem Blogbeitrag haben wir demonstriert, wie man einen einfachen MCP-Server erstellt, der den Apache StreamPipes Java-Client umschließt und dessen Funktionen für Claude Desktop und n8n bereitstellt.

Seid ihr daran interessiert, weitere Funktionen im MCP-Server zu sehen? Lasst es uns wissen!