Exposing Apache Karaf configurations with Apache Arrow Flight
This blog post shows how to use Apache Arrow and Arrow Flight in Apache Karaf runtime.
The code base is available here: https://github.com/jbonofre/karaf-arrow.
Basically, in the
For example, we have
The
In Arrow, we can represent a list of values with a
Then, we will describe
Let's take a look on the
Once we have the schema and the vectors, we "assemble" them in a
The
Let's take a look on the
The
The
The "main" method is
In the
When using Apache Arrow with Java 9+, we have to grant
Now, we can start Karaf runtime with:
The client does the following:
This blog post is the first one of a series about Apache Arrow, Arrow Flight, Arrow Flight SQL, etc.
Enjoy !
Apache Arrow ?
Apache Arrow is composed by:- The Arrow format (specification).
- Format implementations with different languages (C, Java, Go, Rust, ...).
- Extra subprojects like Flight, Flight SQL, ADBC, nanoarrow, ...
- C
- C++
- C#
- Go
- Java
- Python
- Ruby
- Rust
- ADBC (Arrow Database Connectivity) is an API specification for Arrow-based database access. It's similar to JDBC but in a Arrow centric way.
- Arrow Flight and Arrow Flight SQL are RPC frameworks (see later).
- Datafusion is a query engine
Arrow Flight server in Karaf runtime
Theconfig-flight-provider is a OSGi bundle that will expose the ConfigurationAdmin service Configurations as Arrow streams.
It's composed by:
ConfigDatasetrepresents aConfiguration(identified by aPID) using Arrow schema and vectors.ConfigProduceris a Arrow Flight producer exposing eachConfigurationas a Arrow Flight stream, using thePIDas stream path.Activatoris a OSGi bundle activator, loading allConfigDatasetsusing theConfigurationAdminservice and starting a Arrow Flight server.
Representing Apache Karaf Configurations as Arrow schema and vectors
Let's take a look onConfigDataset.
Basically, in the
ConfigurationAdmin Karaf service, we can get all runtime Configurations. Each Configuration has an unique id e.g. the PID (persistent id). A configuration contains a list of key/value pairs e.g. the configuration properties.
For example, we have
my.karaf.configuration Configuration PID, with foo/bar, john/doo properties.
The
ConfigDataset class represents Configuration properties (the associated PID will be manage in ConfigProducer as we will see later).
In Arrow, we can represent a list of values with a
Vector. So, to represent Configuration properties, we will use two vectors: a key vector and a value vector, both as VarCharVector.
Then, we will describe
Configuration properties using a Arrow schema, with two fields: key field is not nullable, value field is nullable.
Let's take a look on the
ConfigDataset code:
package net.nanthrax.karaf.arrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.osgi.service.cm.Configuration;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Enumeration;
public class ConfigDataset implements AutoCloseable {
private final Schema schema;
private final VectorSchemaRoot root;
private final long rows;
public ConfigDataset(Configuration configuration, BufferAllocator allocator) {
this.schema = new Schema(Arrays.asList(
new Field("key", FieldType.notNullable(new ArrowType.Utf8()), null),
new Field("value", FieldType.nullable(new ArrowType.Utf8()), null)
));
VarCharVector keyVector = new VarCharVector("key", allocator);
keyVector.allocateNew(configuration.getProperties().size());
int keyIndex = 0;
Enumeration keys = configuration.getProperties().keys();
while (keys.hasMoreElements()) {
String key = (String) keys.nextElement();
keyVector.set(keyIndex, key.getBytes(StandardCharsets.UTF_8));
keyIndex++;
}
keyVector.setValueCount(configuration.getProperties().size());
VarCharVector valueVector = new VarCharVector("value", allocator);
valueVector.allocateNew(configuration.getProperties().size());
int valueIndex = 0;
Enumeration values = configuration.getProperties().elements();
while (values.hasMoreElements()) {
String value = values.nextElement().toString();
valueVector.set(valueIndex, value.getBytes(StandardCharsets.UTF_8));
valueIndex++;
}
valueVector.setValueCount(configuration.getProperties().size());
this.root = new VectorSchemaRoot(schema, Arrays.asList(keyVector, valueVector), configuration.getProperties().size());
this.rows = configuration.getProperties().size();
}
public Schema getSchema() {
return this.schema;
}
public VectorSchemaRoot getRoot() {
return this.root;
}
public long getRows() {
return this.rows;
}
@Override
public void close() {
root.close();
}
}
We can see in the constructor that we construct the schema with the two fields (key and value). Then, we create the two corresponding vectors. For the memory allocation, we use the number of properties in the Configuration (configuration.getProperties().size()). Then, we iterate on the Configuration properties to populate the vectors. It's important to set the number of values in the vectors using vector.setValueCount(configuration.getProperties().size())).
Once we have the schema and the vectors, we "assemble" them in a
VectorSchemaRoot.
Implementing a Arrow FlightProducer
Now we haveConfigDataset configuration representation, we can expose a Arrow Flight producer.
The
ConfigProducer is a Arrow Flight producer: it exposes the methods used by the Arrow Flight server for binding (leveraging gRPC behind the hood). In our case, we will expose threee services:
getStream()is aGETmethod to retrieve data stream (in our case, a configuration data stream for a given PID)getFlightInfo()is aGETmethod to retrieve information about a stream (for a given PID)listFlights()is aGETmethod to get all available flight streams.
Let's take a look on the
ConfigProducer code:
package net.nanthrax.karaf.arrow;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ConfigProducer extends NoOpFlightProducer implements AutoCloseable {
private final static Logger LOGGER = LoggerFactory.getLogger(ConfigProducer.class);
private final BufferAllocator allocator;
private final Location location;
private final ConcurrentMap datasets;
public ConfigProducer(BufferAllocator allocator, Location location) {
this.allocator = allocator;
this.datasets = new ConcurrentHashMap<>();
this.location = location;
}
public void init(BundleContext bundleContext) throws IllegalStateException, IOException, InvalidSyntaxException {
ServiceReference serviceReference = bundleContext.getServiceReference(ConfigurationAdmin.class);
if (serviceReference == null) {
throw new IllegalStateException("ConfigurationAdmin service is not available");
}
ConfigurationAdmin configurationAdmin = bundleContext.getService(serviceReference);
if (configurationAdmin == null) {
throw new IllegalStateException("ConfigurationAdmin service is not available");
}
init(configurationAdmin);
}
public void init(ConfigurationAdmin configurationAdmin) throws IllegalStateException, IOException, InvalidSyntaxException {
Configuration[] configurations = configurationAdmin.listConfigurations(null);
for (Configuration configuration : configurations) {
FlightDescriptor flightDescriptor = FlightDescriptor.path(configuration.getPid());
LOGGER.info("Processing configuration {}", flightDescriptor.getPath());
ConfigDataset dataset = new ConfigDataset(configuration, allocator);
datasets.put(flightDescriptor, dataset);
}
}
@Override
public void getStream(CallContext callContext, Ticket ticket, ServerStreamListener listener) {
String pid = new String(ticket.getBytes(), StandardCharsets.UTF_8);
LOGGER.info("Get stream for PID {}", pid);
FlightDescriptor flightDescriptor = FlightDescriptor.path(pid);
ConfigDataset dataset = this.datasets.get(flightDescriptor);
if (dataset == null) {
throw CallStatus.NOT_FOUND.withDescription("PID not found").toRuntimeException();
}
listener.start(dataset.getRoot());
LOGGER.info(" Row count: {}", dataset.getRoot().getRowCount());
listener.putNext();
listener.completed();
}
@Override
public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) {
FlightEndpoint flightEndpoint = new FlightEndpoint(new Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), location);
return new FlightInfo(
datasets.get(descriptor).getSchema(),
descriptor,
Collections.singletonList(flightEndpoint),
-1,
datasets.get(descriptor).getRows()
);
}
@Override
public void listFlights(CallContext context, Criteria criteria, StreamListener listener) {
datasets.forEach((k, v) -> { listener.onNext(getFlightInfo(null, k));});
listener.onCompleted();
}
public ConfigDataset getConfigDataset(String pid) {
FlightDescriptor flightDescriptor = FlightDescriptor.path(pid);
return datasets.get(flightDescriptor);
}
@Override
public void close() throws Exception {
AutoCloseables.close(datasets.values());
}
}
In the init() method, we get the ConfigurationAdmin Karaf service and populate datasets Map. This Map contains all ConfigDatasets with PID as key. As we will see later, the init() method will be called in the bundle activator.
The
getFlightInfo() method uses the FlightDescriptor to create a FlightEndpoint with the PID as path. We use this FlightEndpoint to retrieve the corresponding ConfigDataset from the datasets. We use the ConfigDataset to get the schema and the number of rows in the ConfigDataset.
The
listFlights() method is actually using getFlightInfo() to put all flight infos using the provided StreamListener.
The "main" method is
getStream(): it get the PID from the ticket provided by the client to retrieve the corresponding ConfigDataset from the datasets map. If the PID key doesn't exist in the map, we throw CallStatus.NOT_FOUND.withDescription("PID not found").toRuntimeException(). If the PID is found, we retrieve the VectorSchemaRoot. We start the provided ServerStreamListener to send data (listener.putNext()) to the client and complete (close) the stream with listener.completed().
Starting Arrow Flight Server
Finally, we need to "connect the dots together" (injecting theConfigurationAdmin karaf service) and start the Arrow Flight server. This is done in the Activator.
In the
start() method of the Activator:
- we create the root Arrow allocator (that we pass to other component)
- we create the location where the Flight server will listen
- we create and init the
ConfigProducer - and finally we create and start a Arrow Flight server
package net.nanthrax.karaf.arrow;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.Location;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Activator implements BundleActivator {
private final static Logger LOGGER = LoggerFactory.getLogger(Activator.class);
private BufferAllocator allocator;
private ConfigProducer configProducer;
private FlightServer flightServer;
@Override
public void start(BundleContext context) throws Exception {
allocator = new RootAllocator();
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
configProducer = new ConfigProducer(allocator, location);
configProducer.init(context);
flightServer = FlightServer.builder(allocator, location, configProducer).build();
flightServer.start();
LOGGER.info("Starting Arrow Flight Server on port {}", flightServer.getPort());
}
@Override
public void stop(BundleContext context) throws Exception {
LOGGER.info("Stopping Arrow Flight Server");
flightServer.shutdown();
configProducer.close();
allocator.close();
}
}
The Activator is set in the OSGi bundle headers using Bundle-Activator statement.
Deploy in Apache Karaf runtime
For convenience, we create a Karaf features repository for deployment. Even if theconfig-flight-provider embeds most of dependency as private package, it still has import package, meaning that other bundles have to be deployed first in the runtime. That's why a features repository will simplify the deployment:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<features name="karaf-arrow-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.3.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.3.0 http://karaf.apache.org/xmlns/features/v1.3.0">
<feature name="karaf-arrow-server" version="${project.version}">
<bundle dependency="true">mvn:javax.annotation/javax.annotation-api/1.3.2</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-core/2.15.3</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-annotations/2.15.3</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-databind/2.15.3</bundle>
<bundle dependency="true">mvn:com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.15.3</bundle>
<bundle dependency="true">mvn:commons-codec/commons-codec/1.16.0</bundle>
<bundle>mvn:net.nanthrax.karaf.arrow/config-flight-provider/${project.version}</bundle>
</feature>
</features>
Let's use Apache Karaf 4.4.4 runtime. Extract Apache Karaf (you can download from https://www.apache.org/dyn/closer.lua/karaf/4.4.4/apache-karaf-4.4.4.tar.gz) in the folder of your choice.
When using Apache Arrow with Java 9+, we have to grant
java.nio package permission at JVM level. In Apache Karaf, to do that, you have to edit bin/karaf script and add:
--add-opens=java.base/java.nio=ALL-UNNAMED
in the start section.
Now, we can start Karaf runtime with:
$ bin/karaf
__ __ ____
/ //_/____ __________ _/ __/
/ ,< / __ `/ ___/ __ `/ /_
/ /| |/ /_/ / / / /_/ / __/
/_/ |_|\__,_/_/ \__,_/_/
Apache Karaf (4.4.4)
Hit '<tab>' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit '<ctrl-d>' or type 'system:shutdown' or 'logout' to shutdown Karaf.
karaf@root()>
Now, we can register our features repository and deploy our bundle via the karaf-arrow-server feature:
karaf@root()> feature:repo-add mvn:net.nanthrax.karaf.arrow/karaf-arrow/1.0-SNAPSHOT/xml/features
Adding feature url mvn:net.nanthrax.karaf.arrow/karaf-arrow/1.0-SNAPSHOT/xml/features
karaf@root()> feature:install karaf-arrow-server
In the log (you can use log:tail or log:display command), you can see the Arrow Flight server started:
17:57:06.021 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.config]
17:57:06.021 INFO [FelixStartLevel] Processing configuration [profile]
17:57:06.021 INFO [FelixStartLevel] Processing configuration [jmx.acl.osgi.compendium.cm]
17:57:06.022 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.shell]
17:57:06.022 INFO [FelixStartLevel] Processing configuration [org.ops4j.pax.url.mvn]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [jmx.acl.org.apache.karaf.bundle]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.system]
17:57:06.023 INFO [FelixStartLevel] Processing configuration [org.ops4j.pax.logging]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [jmx.acl]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [jmx.acl.org.apache.camel]
17:57:06.024 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.kar]
17:57:06.025 INFO [FelixStartLevel] Processing configuration [org.apache.karaf.command.acl.feature]
17:57:06.104 INFO [FelixStartLevel] Starting Arrow Flight Server on port 33333
Now, we can use a Arrow Flight client to interact with the server.
Arrow Flight Client
Arrow Flight client is pretty easy to implement. In this blog, we do a simple CLI client that will interact with the Flight server located in the Karaf runtime.The client does the following:
- create a Arrow Flight client using the location of our Flight server (
localhost:33333in our case) - we create a
FlightInfousing the PID of the configuration we want to get - using the
FlightInfowe get theFlightStreamfrom the client - during using
FlightStreamwe can directly get theSchemaVectorRootprovided by the server
java -jar for instance). We can see on the console:
[main] INFO org.apache.arrow.memory.BaseAllocator - Debug mode disabled. Enable with the VM option -Darrow.memory.debug.allocator=true.
[main] INFO org.apache.arrow.memory.DefaultAllocationManagerOption - allocation manager type not specified, using netty as the default type
[main] INFO org.apache.arrow.memory.CheckAllocator - Using DefaultAllocationManager at memory-netty/14.0.1/arrow-memory-netty-14.0.1.jar!/org/apache/arrow/memory/DefaultAllocationManagerFactory.class
Client connected to grpc+tcp://localhost:33333
Flight info: FlightInfo{schema=Schema, descriptor=org.apache.karaf.shell, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@3369b90d, expirationTime=(none)}], bytes=-1, records=12, ordered=false}
Row count: 12
key value
completionMode GLOBAL
disableEofExit false
disableLogout false
felix.fileinstall.filename file:/Users/jbonofre/Downloads/apache-karaf-4.4.4/etc/org.apache.karaf.shell.cfg
hostKey /Users/jbonofre/Downloads/apache-karaf-4.4.4/etc/host.key
service.pid org.apache.karaf.shell
sftpEnabled true
sshHost 0.0.0.0
sshIdleTimeout 1800000
sshPort 8101
sshRealm karaf
sshRole ssh
We retrieve all properties from the org.apache.karaf.shell configuration remotely using Arrow Flight !
This blog post is the first one of a series about Apache Arrow, Arrow Flight, Arrow Flight SQL, etc.
Enjoy !

Comments
Post a Comment