Sunday, April 20, 2014

Broadcasting PostgreSQL NOTIFY messages to WebSocket Clients

In this post, I look at how to broadcast NOTIFY messages from a Postgres database to WebSocket clients using Spring MVC 4.

Source code here

Basically, if you change data in your database, it can notify web browser clients, without polling.

Technologies used:


The system works like this:

Client subscribes to a WebSocket topic...

NOTIFY event on database server ->
  PGNotificationListener on web server ->
      Send Websocket notification on server ->
         Receive Websocket event on browser. 

With the code below, if you call NOTIFY dml_events, 'some message'; in Postgres, it will be broadcast to all WebSocket clients.

Follow this answer  regarding proper listener setup

Project Structure:



pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.databasepatterns</groupId>
    <artifactId>pg-notify-websocket</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>war</packaging>
    <url>http://blog.databasepatterns.com/2014/04/postgresql-nofify-websocket-spring-mvc.html</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.version>4.0.5.RELEASE</spring.version>
    </properties>

    <dependencies>

        <dependency>
            <!-- PostgreSQL JDBC driver -->
            <groupId>com.impossibl.pgjdbc-ng</groupId>
            <artifactId>pgjdbc-ng</artifactId>
            <version>0.3</version>
            <classifier>complete</classifier>
        </dependency>

        <dependency>
            <!-- Your JSON library -->
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-messaging</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>

    </dependencies>

    <build>

        <plugins>

            <plugin>
                <groupId>org.apache.tomcat.maven</groupId>
                <artifactId>tomcat7-maven-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <path>/</path>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.apache.tomcat.embed</groupId>
                        <artifactId>tomcat-embed-websocket</artifactId>
                        <version>7.0.52</version>
                    </dependency>
                </dependencies>
            </plugin>

        </plugins>

    </build>

</project>

PGNotifyToWebSocket.java:

import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import javax.sql.DataSource;
import java.sql.Statement;


/**
 * @since 7/22/2014
 */
public class PGNotifyToWebSocket {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    private PGConnection pgConnection;

    public PGNotifyToWebSocket(DataSource dataSource) throws Throwable {

        pgConnection = (PGConnection) dataSource.getConnection();

        pgConnection.addNotificationListener(new PGNotificationListener() {
            @Override
            public void notification(int processId, String channelName, String payload) {
                messagingTemplate.convertAndSend("/channels/" + channelName, payload);
            }
        });
    }

    public void init() throws Throwable {

        Statement statement = pgConnection.createStatement();
        statement.execute("LISTEN dml_events");
        statement.close();
    }

    public void destroy() throws Throwable {

        Statement statement = pgConnection.createStatement();
        statement.execute("UNLISTEN dml_events");
        statement.close();
    }
}
mvc-dispatcher-servlet.xml:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:websocket="http://www.springframework.org/schema/websocket"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket.xsd
        ">

    <context:property-placeholder/>

    <context:component-scan base-package="com.databasepatterns"/>

    <mvc:annotation-driven/>

    <mvc:view-controller path="/" view-name="/WEB-INF/index.jsp"/>

    <bean class="com.impossibl.postgres.jdbc.PGDataSource" id="dataSource">
        <property name="host" value="${PGHOST:localhost}"/>
        <property name="database" value="${PGDATABASE:postgres}"/>
        <property name="password" value="${PGPASSWORD}"/>
        <property name="port" value="${PGPORT:5432}"/>
        <property name="user" value="${PGUSER:postgres}"/>
    </bean>

    <bean class="PGNotifyToWebSocket" init-method="init" destroy-method="destroy">
        <constructor-arg ref="dataSource"/>
    </bean>

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/hello">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/channels"/>
    </websocket:message-broker>

</beans>

index.jsp:

<html>

<body>

<p>Run <kbd>NOTIFY dml_events 'some message';</kbd> in Postgres (in the <code>$PGDATABASE</code> or <code>postgres</code> database). Tested with PG 9.3, on Windows 7, Chrome 36.</p>

<div id="out"></div>

<script src="//cdnjs.cloudflare.com/ajax/libs/sockjs-client/0.3.4/sockjs.min.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>

<script>

      var socket = new SockJS("/hello");

      var stompClient = Stomp.over(socket);

      stompClient.connect( {}, function(frame) {

        stompClient.subscribe("/channels/dml_events", function(response) {
            document.getElementById("out").innerText += response + "\r\n\r\n";
        });

      });

</script>
</body>

</html>
web.xml:

<web-app version="3.0"
         xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="
         http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd
         ">

    <servlet>
        <servlet-name>mvc-dispatcher</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    </servlet>

    <servlet-mapping>
        <servlet-name>mvc-dispatcher</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>

</web-app>

7 comments:

Unknown said...

Can you do this with Oracle or any other database?

Neil McGuigan said...

@Rodrick you can use DBMS_ALERT in Oracle to do something similar:

http://docs.oracle.com/cd/E11882_01/appdev.112/e16760/d_alert.htm#ARPLS351

Ming said...

I tried this. It did not work. I did not see any error message. How should I debug this thing?

Alon Amir said...

My default PostgreSQL driver from now on, thanks.

Unknown said...

This article demonstrates to you generally accepted methods to actualize IVR expansions with your twilio telephone number.ivr telephony

de said...
This comment has been removed by a blog administrator.
Unknown said...
This comment has been removed by a blog administrator.