Sunday, April 20, 2014

Broadcasting PostgreSQL NOTIFY messages to WebSocket Clients

In this post, I look at how to broadcast NOTIFY messages from a Postgres database to WebSocket clients using Spring MVC 4.

Source code here

Basically, if you change data in your database, it can notify web browser clients, without polling.

Technologies used:


The system works like this:

Client subscribes to a WebSocket topic...

NOTIFY event on database server ->
  PGNotificationListener on web server ->
      Send Websocket notification on server ->
         Receive Websocket event on browser. 

With the code below, if you call NOTIFY dml_events, 'some message'; in Postgres, it will be broadcast to all WebSocket clients.

Follow this answer  regarding proper listener setup

Project Structure:



pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.databasepatterns</groupId>
    <artifactId>pg-notify-websocket</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>war</packaging>
    <url>http://blog.databasepatterns.com/2014/04/postgresql-nofify-websocket-spring-mvc.html</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.version>4.0.5.RELEASE</spring.version>
    </properties>

    <dependencies>

        <dependency>
            <!-- PostgreSQL JDBC driver -->
            <groupId>com.impossibl.pgjdbc-ng</groupId>
            <artifactId>pgjdbc-ng</artifactId>
            <version>0.3</version>
            <classifier>complete</classifier>
        </dependency>

        <dependency>
            <!-- Your JSON library -->
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-messaging</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>

    </dependencies>

    <build>

        <plugins>

            <plugin>
                <groupId>org.apache.tomcat.maven</groupId>
                <artifactId>tomcat7-maven-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <path>/</path>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.apache.tomcat.embed</groupId>
                        <artifactId>tomcat-embed-websocket</artifactId>
                        <version>7.0.52</version>
                    </dependency>
                </dependencies>
            </plugin>

        </plugins>

    </build>

</project>

PGNotifyToWebSocket.java:

import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import javax.sql.DataSource;
import java.sql.Statement;


/**
 * @since 7/22/2014
 */
public class PGNotifyToWebSocket {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    private PGConnection pgConnection;

    public PGNotifyToWebSocket(DataSource dataSource) throws Throwable {

        pgConnection = (PGConnection) dataSource.getConnection();

        pgConnection.addNotificationListener(new PGNotificationListener() {
            @Override
            public void notification(int processId, String channelName, String payload) {
                messagingTemplate.convertAndSend("/channels/" + channelName, payload);
            }
        });
    }

    public void init() throws Throwable {

        Statement statement = pgConnection.createStatement();
        statement.execute("LISTEN dml_events");
        statement.close();
    }

    public void destroy() throws Throwable {

        Statement statement = pgConnection.createStatement();
        statement.execute("UNLISTEN dml_events");
        statement.close();
    }
}
mvc-dispatcher-servlet.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:websocket="http://www.springframework.org/schema/websocket"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket.xsd
        ">

    <context:property-placeholder/>

    <context:component-scan base-package="com.databasepatterns"/>

    <mvc:annotation-driven/>

    <mvc:view-controller path="/" view-name="/WEB-INF/index.jsp"/>

    <bean class="com.impossibl.postgres.jdbc.PGDataSource" id="dataSource">
        <property name="host" value="${PGHOST:localhost}"/>
        <property name="database" value="${PGDATABASE:postgres}"/>
        <property name="password" value="${PGPASSWORD}"/>
        <property name="port" value="${PGPORT:5432}"/>
        <property name="user" value="${PGUSER:postgres}"/>
    </bean>

    <bean class="PGNotifyToWebSocket" init-method="init" destroy-method="destroy">
        <constructor-arg ref="dataSource"/>
    </bean>

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/hello">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/channels"/>
    </websocket:message-broker>

</beans>

index.jsp:

<html>

<body>

<p>Run <kbd>NOTIFY dml_events 'some message';</kbd> in Postgres (in the <code>$PGDATABASE</code> or <code>postgres</code> database). Tested with PG 9.3, on Windows 7, Chrome 36.</p>

<div id="out"></div>

<script src="//cdnjs.cloudflare.com/ajax/libs/sockjs-client/0.3.4/sockjs.min.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>

<script>

      var socket = new SockJS("/hello");

      var stompClient = Stomp.over(socket);

      stompClient.connect( {}, function(frame) {

        stompClient.subscribe("/channels/dml_events", function(response) {
            document.getElementById("out").innerText += response + "\r\n\r\n";
        });

      });

</script>
</body>

</html>
web.xml:

<web-app version="3.0"
         xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="
         http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd
         ">

    <servlet>
        <servlet-name>mvc-dispatcher</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    </servlet>

    <servlet-mapping>
        <servlet-name>mvc-dispatcher</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>

</web-app>