Exposing Apache Karaf configurations with Apache Arrow Flight

This blog post shows how to use Apache Arrow and Arrow Flight in Apache Karaf runtime. The code base is available here: https://github.com/jbonofre/karaf-arrow.

Apache Arrow ?

Apache Arrow is composed by:
  • The Arrow format (specification).
  • Format implementations with different languages (C, Java, Go, Rust, ...).
  • Extra subprojects like Flight, Flight SQL, ADBC, nanoarrow, ...
Apache Arrow format is a in-memory columnar format, super efficient for scanning and iterating a large chunks of data. In traditional memory buffer, we focus on "row". Apache Arrow format "groups" data per column, which is much more efficient on modern CPUs/GPUs.
The Apache Arrow format is basically a specification. We have several implementations for this specification:
  • C
  • C++
  • C#
  • Go
  • Java
  • Python
  • Ruby
  • Rust
As the purpose of this blog post is to show Apache Arrow in Apache Karaf, we will use Arrow Java. In addition, Apache Arrow also provides several subprojects, based on some implementations:
  • ADBC (Arrow Database Connectivity) is an API specification for Arrow-based database access. It's similar to JDBC but in a Arrow centric way.
  • Arrow Flight and Arrow Flight SQL are RPC frameworks (see later).
  • Datafusion is a query engine
In this blog post, we will use Apache Arrow Flight which is a RPC framework exchanging Arrow data, built on top of gRPC and Arrow IPC.

Arrow Flight server in Karaf runtime

The config-flight-provider is a OSGi bundle that will expose the ConfigurationAdmin service Configurations as Arrow streams. It's composed by:
  • ConfigDataset represents a Configuration (identified by a PID) using Arrow schema and vectors.
  • ConfigProducer is a Arrow Flight producer exposing each Configuration as a Arrow Flight stream, using the PID as stream path.
  • Activator is a OSGi bundle activator, loading all ConfigDatasets using the ConfigurationAdmin service and starting a Arrow Flight server.

Representing Apache Karaf Configurations as Arrow schema and vectors

Let's take a look on ConfigDataset.

Basically, in the ConfigurationAdmin Karaf service, we can get all runtime Configurations. Each Configuration has an unique id e.g. the PID (persistent id). A configuration contains a list of key/value pairs e.g. the configuration properties.

For example, we have my.karaf.configuration Configuration PID, with foo/bar, john/doo properties.

The ConfigDataset class represents Configuration properties (the associated PID will be manage in ConfigProducer as we will see later).

In Arrow, we can represent a list of values with a Vector. So, to represent Configuration properties, we will use two vectors: a key vector and a value vector, both as VarCharVector.

Then, we will describe Configuration properties using a Arrow schema, with two fields: key field is not nullable, value field is nullable.

Let's take a look on the ConfigDataset code:
  
package net.nanthrax.karaf.arrow;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.osgi.service.cm.Configuration;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Enumeration;

public class ConfigDataset implements AutoCloseable {

    private final Schema schema;
    private final VectorSchemaRoot root;
    private final long rows;

    public ConfigDataset(Configuration configuration, BufferAllocator allocator) {
        this.schema = new Schema(Arrays.asList(
                new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null),
                new Field("value", FieldType.nullable(new ArrowType.Utf8()), null)
        ));
        VarCharVector keyVector = new VarCharVector("key", allocator);
        keyVector.allocateNew(configuration.getProperties().size());
        int keyIndex = 0;
        Enumeration keys = configuration.getProperties().keys();
        while (keys.hasMoreElements()) {
            String key = (String) keys.nextElement();
            keyVector.set(keyIndex, key.getBytes(StandardCharsets.UTF_8));
            keyIndex++;
        }
        keyVector.setValueCount(configuration.getProperties().size());

        VarCharVector valueVector = new VarCharVector("value", allocator);
        valueVector.allocateNew(configuration.getProperties().size());
        int valueIndex = 0;
        Enumeration values = configuration.getProperties().elements();
        while (values.hasMoreElements()) {
            String value = values.nextElement().toString();
            valueVector.set(valueIndex, value.getBytes(StandardCharsets.UTF_8));
            valueIndex++;
        }
        valueVector.setValueCount(configuration.getProperties().size());

        this.root = new VectorSchemaRoot(schema, Arrays.asList(keyVector, valueVector), configuration.getProperties().size());

        this.rows = configuration.getProperties().size();
    }

    public Schema getSchema() {
        return this.schema;
    }

    public VectorSchemaRoot getRoot() {
        return this.root;
    }

    public long getRows() {
        return this.rows;
    }

    @Override
    public void close() {
        root.close();
    }

}
  
  
We can see in the constructor that we construct the schema with the two fields (key and value). Then, we create the two corresponding vectors. For the memory allocation, we use the number of properties in the Configuration (configuration.getProperties().size()). Then, we iterate on the Configuration properties to populate the vectors. It's important to set the number of values in the vectors using vector.setValueCount(configuration.getProperties().size())).

Once we have the schema and the vectors, we "assemble" them in a VectorSchemaRoot.

Implementing a Arrow FlightProducer

Now we have ConfigDataset configuration representation, we can expose a Arrow Flight producer.

The ConfigProducer is a Arrow Flight producer: it exposes the methods used by the Arrow Flight server for binding (leveraging gRPC behind the hood). In our case, we will expose threee services:
  • getStream() is a GET method to retrieve data stream (in our case, a configuration data stream for a given PID)
  • getFlightInfo() is a GET method to retrieve information about a stream (for a given PID)
  • listFlights() is a GET method to get all available flight streams.


Let's take a look on the ConfigProducer code:
  
package net.nanthrax.karaf.arrow;

import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ConfigProducer extends NoOpFlightProducer implements AutoCloseable {

    private final static Logger LOGGER = LoggerFactory.getLogger(ConfigProducer.class);

    private final BufferAllocator allocator;
    private final Location location;
    private final ConcurrentMap datasets;

    public ConfigProducer(BufferAllocator allocator, Location location) {
        this.allocator = allocator;
        this.datasets = new ConcurrentHashMap<>();
        this.location = location;
    }

    public void init(BundleContext bundleContext) throws IllegalStateException, IOException, InvalidSyntaxException {
        ServiceReference serviceReference = bundleContext.getServiceReference(ConfigurationAdmin.class);
        if (serviceReference == null) {
            throw new IllegalStateException("ConfigurationAdmin service is not available");
        }
        ConfigurationAdmin configurationAdmin = bundleContext.getService(serviceReference);
        if (configurationAdmin == null) {
            throw new IllegalStateException("ConfigurationAdmin service is not available");
        }
        init(configurationAdmin);
    }

    public void init(ConfigurationAdmin configurationAdmin) throws IllegalStateException, IOException, InvalidSyntaxException {
        Configuration[] configurations = configurationAdmin.listConfigurations(null);
        for (Configuration configuration : configurations) {
            FlightDescriptor flightDescriptor = FlightDescriptor.path(configuration.getPid());
            LOGGER.info("Processing configuration {}", flightDescriptor.getPath());
            ConfigDataset dataset = new ConfigDataset(configuration, allocator);
            datasets.put(flightDescriptor, dataset);
        }
    }

    @Override
    public void getStream(CallContext callContext, Ticket ticket, ServerStreamListener listener) {
        String pid = new String(ticket.getBytes(), StandardCharsets.UTF_8);
        LOGGER.info("Get stream for PID {}", pid);
        FlightDescriptor flightDescriptor = FlightDescriptor.path(pid);
        ConfigDataset dataset = this.datasets.get(flightDescriptor);
        if (dataset == null) {
            throw CallStatus.NOT_FOUND.withDescription("PID not found").toRuntimeException();
        }
        listener.start(dataset.getRoot());
        LOGGER.info("   Row count: {}", dataset.getRoot().getRowCount());
        listener.putNext();
        listener.completed();
    }

    @Override
    public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) {
        FlightEndpoint flightEndpoint = new FlightEndpoint(new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location);
        return new FlightInfo(
                datasets.get(descriptor).getSchema(),
                descriptor,
                Collections.singletonList(flightEndpoint),
                -1,
                datasets.get(descriptor).getRows()
        );
    }

    @Override
    public void listFlights(CallContext context, Criteria criteria, StreamListener listener) {
        datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k));});
        listener.onCompleted();
    }

    public ConfigDataset getConfigDataset(String pid) {
        FlightDescriptor flightDescriptor = FlightDescriptor.path(pid);
        return datasets.get(flightDescriptor);
    }

    @Override
    public void close() throws Exception {
        AutoCloseables.close(datasets.values());
    }
}  
  
In the init() method, we get the ConfigurationAdmin Karaf service and populate datasets Map. This Map contains all ConfigDatasets with PID as key. As we will see later, the init() method will be called in the bundle activator.

The getFlightInfo() method uses the FlightDescriptor to create a FlightEndpoint with the PID as path. We use this FlightEndpoint to retrieve the corresponding ConfigDataset from the datasets. We use the ConfigDataset to get the schema and the number of rows in the ConfigDataset.

The listFlights() method is actually using getFlightInfo() to put all flight infos using the provided StreamListener.

The "main" method is getStream(): it get the PID from the ticket provided by the client to retrieve the corresponding ConfigDataset from the datasets map. If the PID key doesn't exist in the map, we throw CallStatus.NOT_FOUND.withDescription("PID not found").toRuntimeException(). If the PID is found, we retrieve the VectorSchemaRoot. We start the provided ServerStreamListener to send data (listener.putNext()) to the client and complete (close) the stream with listener.completed().

Starting Arrow Flight Server

Finally, we need to "connect the dots together" (injecting the ConfigurationAdmin karaf service) and start the Arrow Flight server. This is done in the Activator.

In the start() method of the Activator:
  • we create the root Arrow allocator (that we pass to other component)
  • we create the location where the Flight server will listen
  • we create and init the ConfigProducer
  • and finally we create and start a Arrow Flight server
  
package net.nanthrax.karaf.arrow;

import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Activator implements BundleActivator {

    private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class);

    private BufferAllocator allocator;
    private ConfigProducer configProducer;
    private FlightServer flightServer;

    @Override
    public void start(BundleContext context) throws Exception {
        allocator = new RootAllocator();
        Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
        configProducer = new ConfigProducer(allocator, location);
        configProducer.init(context);
        flightServer = FlightServer.builder(allocator, location, configProducer).build();
        flightServer.start();
        LOGGER.info("Starting Arrow Flight Server on port {}", flightServer.getPort());
    }

    @Override
    public void stop(BundleContext context) throws Exception {
        LOGGER.info("Stopping Arrow Flight Server");
        flightServer.shutdown();
        configProducer.close();
        allocator.close();
    }
}
  
  
The Activator is set in the OSGi bundle headers using Bundle-Activator statement.

Deploy in Apache Karaf runtime

For convenience, we create a Karaf features repository for deployment. Even if the config-flight-provider embeds most of dependency as private package, it still has import package, meaning that other bundles have to be deployed first in the runtime. That's why a features repository will simplify the deployment:
  
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<features name="karaf-arrow-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.3.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.3.0 http://karaf.apache.org/xmlns/features/v1.3.0">

    <feature name="karaf-arrow-server" version="${project.version}">
        <bundle dependency="true">mvn:javax.annotation/javax.annotation-api/1.3.2</bundle>
        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-core/2.15.3</bundle>
        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-annotations/2.15.3</bundle>
        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-databind/2.15.3</bundle>
        <bundle dependency="true">mvn:com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.15.3</bundle>
        <bundle dependency="true">mvn:commons-codec/commons-codec/1.16.0</bundle>
        <bundle>mvn:net.nanthrax.karaf.arrow/config-flight-provider/${project.version}</bundle>
    </feature>

</features>
  
Let's use Apache Karaf 4.4.4 runtime. Extract Apache Karaf (you can download from https://www.apache.org/dyn/closer.lua/karaf/4.4.4/apache-karaf-4.4.4.tar.gz) in the folder of your choice.

When using Apache Arrow with Java 9+, we have to grant java.nio package permission at JVM level. In Apache Karaf, to do that, you have to edit bin/karaf script and add:

--add-opens=java.base/java.nio=ALL-UNNAMED

in the start section.

Now, we can start Karaf runtime with:

$ bin/karaf
        __ __                  ____      
       / //_/____ __________ _/ __/      
      / ,<  / __ `/ ___/ __ `/ /_        
     / /| |/ /_/ / /  / /_/ / __/        
    /_/ |_|\__,_/_/   \__,_/_/         

  Apache Karaf (4.4.4)

Hit '<tab>' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit '<ctrl-d>' or type 'system:shutdown' or 'logout' to shutdown Karaf.

karaf@root()>   

Now, we can register our features repository and deploy our bundle via the karaf-arrow-server feature:

karaf@root()> feature:repo-add mvn:net.nanthrax.karaf.arrow/karaf-arrow/1.0-SNAPSHOT/xml/features
Adding feature url mvn:net.nanthrax.karaf.arrow/karaf-arrow/1.0-SNAPSHOT/xml/features
karaf@root()> feature:install karaf-arrow-server

In the log (you can use log:tail or log:display command), you can see the Arrow Flight server started:

17:57:06.021 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.config]
17:57:06.021 INFO [FelixStartLevel] Processing configuration [profile]
17:57:06.021 INFO [FelixStartLevel] Processing configuration [jmx.acl.osgi.compendium.cm]
17:57:06.022 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.shell]
17:57:06.022 INFO [FelixStartLevel] Processing configuration [org.ops4j.pax.url.mvn]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [jmx.acl.org.apache.karaf.bundle]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.system]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [org.ops4j.pax.logging]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [jmx.acl]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [jmx.acl.org.apache.camel]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.kar]
17:57:06.025 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.feature]
17:57:06.104 INFO [FelixStartLevel] Starting Arrow Flight Server on port 33333

Now, we can use a Arrow Flight client to interact with the server.

Arrow Flight Client

Arrow Flight client is pretty easy to implement. In this blog, we do a simple CLI client that will interact with the Flight server located in the Karaf runtime.

The client does the following:
  • create a Arrow Flight client using the location of our Flight server (localhost:33333 in our case)
  • we create a FlightInfo using the PID of the configuration we want to get
  • using the FlightInfo we get the FlightStream from the client
  • during using FlightStream we can directly get the SchemaVectorRoot provided by the server
We just run the client as a regular Java application (using java -jar for instance). We can see on the console:

[main] INFO org.apache.arrow.memory.BaseAllocator - Debug mode disabled. Enable with the VM option -Darrow.memory.debug.allocator=true.
[main] INFO org.apache.arrow.memory.DefaultAllocationManagerOption - allocation manager type not specified, using netty as the default type
[main] INFO org.apache.arrow.memory.CheckAllocator - Using DefaultAllocationManager at memory-netty/14.0.1/arrow-memory-netty-14.0.1.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class

Client connected to grpc+tcp://localhost:33333

Flight info: FlightInfo{schema=Schema, descriptor=org.apache.karaf.shell, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@3369b90d, expirationTime=(none)}], bytes=-1, records=12, ordered=false}

Row count: 12

key	value
completionMode	GLOBAL
disableEofExit	false
disableLogout	false
felix.fileinstall.filename	file:/Users/jbonofre/Downloads/apache-karaf-4.4.4/etc/org.apache.karaf.shell.cfg
hostKey	/Users/jbonofre/Downloads/apache-karaf-4.4.4/etc/host.key
service.pid	org.apache.karaf.shell
sftpEnabled	true
sshHost	0.0.0.0
sshIdleTimeout	1800000
sshPort	8101
sshRealm	karaf
sshRole	ssh

We retrieve all properties from the org.apache.karaf.shell configuration remotely using Arrow Flight !

This blog post is the first one of a series about Apache Arrow, Arrow Flight, Arrow Flight SQL, etc.

Enjoy !

Comments

Popular posts from this blog

Getting started with Apache Karaf Minho

Using Apache Karaf with Kubernetes