Skip to content

Commit

Permalink
merge(travis): pull changes from uptake#28;
Browse files Browse the repository at this point in the history
  • Loading branch information
mjermann committed May 26, 2019
2 parents 86a75e3 + 2760b8a commit 7281561
Show file tree
Hide file tree
Showing 19 changed files with 49 additions and 64 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
*.o
*.so
*.DS_Store
fRanz/inst/librdkafka*
franz/src/librdkafka
librdkafka*
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
fRanz is an open source R kafka client that allows users to read and write messages from kafka. It leverages the stability and performance of [librdkafka](https://github.com/edenhill/librdkafka) and implements ididiomatic R workflows ontop of it.


## Installation

We're working on it. Currently you need librdkafka as a system available library in order to load the R package. In order to install from source you also need the headers. A make recipe in the top level `make librdkafka` should work for *nix systems.


No attempt has been made for windows compatability.

## Example of sending and reading a message

```r
Expand Down
2 changes: 1 addition & 1 deletion fRanz/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: fRanz
Type: Package
Title: An R Kafka Client
Title: An R Kafka Client.
Version: 0.1.0
Date: 2019-05-13
Authors@R: c(
Expand Down
3 changes: 3 additions & 0 deletions fRanz/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

export(GetRdConsumer)
export(GetRdProducer)
export(KafkaBroker)
export(KafkaConsume)
export(KafkaConsumer)
export(KafkaProduce)
export(KafkaProducer)
export(RdSubscribe)
importFrom(R6,R6Class)
importFrom(Rcpp,sourceCpp)
Expand Down
3 changes: 2 additions & 1 deletion fRanz/R/KafkaBroker.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' @title KafkaBroker
#' @name Kafka Broker
#' @name KafkaBroker
#' @description TDB
#' @export
#' @importFrom R6 R6Class
KafkaBroker <- R6::R6Class(
classname = "KafkaBroker"
Expand Down
1 change: 1 addition & 0 deletions fRanz/R/KafkaConsumer.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' @title Kakfa Consumer
#' @name KafkaConsumer
#' @description TDB
#' @export
#' @importFrom R6 R6Class
KafkaConsumer <- R6::R6Class(
classname = "KafkaConsumer"
Expand Down
1 change: 1 addition & 0 deletions fRanz/R/KafkaProducer.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' @title Kakfa Producer
#' @name KafkaProducer
#' @description TDB
#' @export
#' @importFrom R6 R6Class
KafkaProducer <- R6::R6Class(
classname = "KafkaProducer"
Expand Down
11 changes: 6 additions & 5 deletions fRanz/R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#' @title GetRdConsumer
#' @name GetRdConsumer
#' @description Creates an Rcpp::XPtr<RdKafka::Consumer>. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
#' @description Creates an Rcpp::XPtr<RdKafka::Consumer>. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
#' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer
#' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys.
#' @return a Rcpp::XPtr<RdKafka::Consumer>
Expand All @@ -26,18 +26,19 @@ RdSubscribe <- function(consumerPtr, Rtopics) {

#' @title KafkaConsume
#' @name KafkaConsume
#' @description
#' @description A method to that takes a consumer pointer and returns at most the specified number of results, unless the timeout is reached
#' @param consumerPtr a reference to a Rcpp::XPtr<RdKafka::KafkaConsumer>
#' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum
#' @param timeout the timeout in milliseconds. Default is 10000
#' @return a list of length numResults with values list(key=key,value=value)
#' @export
KafkaConsume <- function(consumerPtr, numResults) {
.Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults)
KafkaConsume <- function(consumerPtr, numResults, timeout = 10000L) {
.Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults, timeout)
}

#' @title GetRdProducer
#' @name GetRdProducer
#' @description Creates an Rcpp::XPtr<RdKafka::Producer>. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
#' @description Creates an Rcpp::XPtr<RdKafka::Producer>. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
#' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer
#' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys.
#' @return a Rcpp::XPtr<RdKafka::Producer>
Expand Down
2 changes: 1 addition & 1 deletion fRanz/man/GetRdConsumer.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion fRanz/man/GetRdProducer.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions fRanz/man/Kafka-Broker.Rd → fRanz/man/KafkaBroker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions fRanz/man/KafkaConsume.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 2 additions & 23 deletions fRanz/src/Makevars
Original file line number Diff line number Diff line change
@@ -1,23 +1,2 @@
INSTALLDIR = $(HOME)/.fRanz/librdkafka
LIBRDKAFKADIR = $(PWD)/../inst/librdkafka-0.11.6

PKG_LIBS = -L$(INSTALLDIR)/src-cpp -lrdkafka++
PKG_CXXFLAGS = -std=c++11 -I$(INSTALLDIR)/src-cpp

.PHONY: all install_librdkadka

all: install_librdkadka

install_librdkadka:
if [ ! -s $(INSTALLDIR)/src-cpp/librdkafka++.a ] ; \
then \
mkdir -p $(INSTALLDIR) && \
cd $(shell dirname $(LIBRDKAFKADIR)) && \
tar xzf $(LIBRDKAFKADIR).tar.gz && \
cd $(LIBRDKAFKADIR) && \
./configure && \
$(MAKE) && \
$(MAKE) install && \
mv * $(INSTALLDIR) ; \
fi

PKG_LIBS = -lrdkafka++
CXX_STD=CXX11
9 changes: 5 additions & 4 deletions fRanz/src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ BEGIN_RCPP
END_RCPP
}
// KafkaConsume
Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults);
RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP) {
Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout);
RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP, SEXP timeoutSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< SEXP >::type consumerPtr(consumerPtrSEXP);
Rcpp::traits::input_parameter< int >::type numResults(numResultsSEXP);
rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults));
Rcpp::traits::input_parameter< int >::type timeout(timeoutSEXP);
rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults, timeout));
return rcpp_result_gen;
END_RCPP
}
Expand Down Expand Up @@ -72,7 +73,7 @@ END_RCPP
static const R_CallMethodDef CallEntries[] = {
{"_fRanz_GetRdConsumer", (DL_FUNC) &_fRanz_GetRdConsumer, 2},
{"_fRanz_RdSubscribe", (DL_FUNC) &_fRanz_RdSubscribe, 2},
{"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 2},
{"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 3},
{"_fRanz_GetRdProducer", (DL_FUNC) &_fRanz_GetRdProducer, 2},
{"_fRanz_KafkaProduce", (DL_FUNC) &_fRanz_KafkaProduce, 5},
{NULL, NULL, 0}
Expand Down
19 changes: 6 additions & 13 deletions fRanz/src/consumer.cpp
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
#include <rdkafkacpp.h>
#include <librdkafka/rdkafkacpp.h>
#include <Rcpp.h>
#include "utils.h"
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <thread>
#include <chrono>
#include <cstring>

////////////////////////////////////////////////////////////////////////////////////////
//' @title GetRdConsumer
//' @name GetRdConsumer
//' @description Creates an Rcpp::XPtr<RdKafka::Consumer>. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
//' @description Creates an Rcpp::XPtr<RdKafka::Consumer>. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
//' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer
//' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys.
//' @return a Rcpp::XPtr<RdKafka::Consumer>
Expand Down Expand Up @@ -52,18 +44,19 @@ int RdSubscribe(SEXP consumerPtr, const Rcpp::StringVector Rtopics) {

//' @title KafkaConsume
//' @name KafkaConsume
//' @description
//' @description A method to that takes a consumer pointer and returns at most the specified number of results, unless the timeout is reached
//' @param consumerPtr a reference to a Rcpp::XPtr<RdKafka::KafkaConsumer>
//' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum
//' @param timeout the timeout in milliseconds. Default is 10000
//' @return a list of length numResults with values list(key=key,value=value)
//' @export
// [[Rcpp::export]]
Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults) {
Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout = 10000) {
Rcpp::XPtr<RdKafka::KafkaConsumer> consumer(consumerPtr);

Rcpp::List messages(numResults);
for(int i = 0; i < numResults; i++) {
RdKafka::Message *msg = consumer->consume(10000);
RdKafka::Message *msg = consumer->consume(timeout);
switch(msg->err()){
case RdKafka::ERR_NO_ERROR: {
Rcpp::List message = Rcpp::List::create(Rcpp::Named("key") = *msg->key(),
Expand Down
10 changes: 2 additions & 8 deletions fRanz/src/producer.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
#include <rdkafkacpp.h>
#include <librdkafka/rdkafkacpp.h>
#include <Rcpp.h>
#include "utils.h"
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>

//' @title GetRdProducer
//' @name GetRdProducer
//' @description Creates an Rcpp::XPtr<RdKafka::Producer>. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
//' @description Creates an Rcpp::XPtr<RdKafka::Producer>. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
//' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer
//' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys.
//' @return a Rcpp::XPtr<RdKafka::Producer>
Expand Down
2 changes: 1 addition & 1 deletion fRanz/src/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <Rcpp.h>
#include <rdkafkacpp.h>
#include <librdkafka/rdkafkacpp.h>



Expand Down
2 changes: 1 addition & 1 deletion fRanz/src/utils.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include <Rcpp.h>
#include <rdkafkacpp.h>
#include <librdkafka/rdkafkacpp.h>

RdKafka::Conf* MakeKafkaConfig(Rcpp::StringVector keys, Rcpp::StringVector values);
2 changes: 2 additions & 0 deletions fRanz/tests/testthat/test-KafkaConsumer.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ context("KafkaConsumer and KafkaProducer")


testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 message",{
testthat::skip_on_cran()
## Standard Set Up
topic <- uuid::UUIDgenerate()
group <- uuid::UUIDgenerate()
Expand All @@ -25,6 +26,7 @@ testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 me


testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming random number of messages",{
testthat::skip_on_cran()
## Standard Set Up
topic <- uuid::UUIDgenerate()
group <- uuid::UUIDgenerate()
Expand Down

0 comments on commit 7281561

Please sign in to comment.