Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io.confluent.connect.storage.errors.PartitionException: Error encoding partition #131

Open
Adeljoo opened this issue Apr 10, 2020 · 14 comments

Comments

@Adeljoo
Copy link

Adeljoo commented Apr 10, 2020

I added my custom partitioner and I get the error when I write to a topic and I see the following error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:559) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:315) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:186) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: io.confluent.connect.storage.errors.PartitionException: Error encoding partition. at com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner$PartitionFieldExtractor.extract(FieldAndTimeBasedPartitioner.java:153) at com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner.encodePartition(FieldAndTimeBasedPartitioner.java:97) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:205) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:176) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:537) ... 10 more

Config of json file:

for i in "${!KCONNECT_NODES[@]}"; do
    curl ${KCONNECT_NODES[$i]}/connectors -XPOST -H 'Content-type: application/json' -H 'Accept: application/json' -d '{
        "name": "connect-s3-sink-'$i'",
        "config": {
            "topics": "events",
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "tasks.max" : 4,
            "flush.size": 100,
            "rotate.schedule.interval.ms": "-1",
            "rotate.interval.ms": "-1",
            "s3.region" : "eu-west-1",
            "s3.bucket.name" : "byob-raw",
            "s3.compression.type": "gzip",
            "topics.dir": "topics",
            "storage.class" : "io.confluent.connect.s3.storage.S3Storage",
            "partitioner.class": "com.accelerator.kafka.connect.FieldAndTimeBasedPartitioner",
            "partition.duration.ms" : "3600000",
            "path.format": "YYYY-MM-dd",
            "locale" : "US",
            "timezone" : "UTC",
            "schema.compatibility": "NONE",
            "format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
            "timestamp.extractor": "Record",
            "partition.field" : "appId"
        }
    }'
done
@OneCricketeer
Copy link

I don't think this is an issue with the repo.

Can you please show your partitioner?

Did you use the code in #73?

@OneCricketeer
Copy link

You should also try debugging your own code and or write more unit tests

https://stackoverflow.com/questions/45717658/what-is-a-simple-effective-way-to-debug-custom-kafka-connectors

@Adeljoo
Copy link
Author

Adeljoo commented Apr 14, 2020

@Cricket007 thank you for the link, I tried to debug and still throws the same error. here is the line of code that it complains about it:
`public String encodePartition(SinkRecord sinkRecord) {

    final Long timestamp = this.timestampExtractor.extract(sinkRecord);
    final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);

    return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);

}

`

I am not sure should I change something in json or other things caused the problem.

@Adeljoo
Copy link
Author

Adeljoo commented Apr 14, 2020

I don't think this is an issue with the repo.

Can you please show your partitioner?

Did you use the code in #73?

yes

@OneCricketeer
Copy link

Are you able to run the unit tests?

Can you fork that branch and run a build with your own test that produces the exception?

@Adeljoo
Copy link
Author

Adeljoo commented Apr 15, 2020

Sorry I used the following code:
https://github.com/canelmas/kafka-connect-field-and-time-partitioner
not the one in 73

@OneCricketeer
Copy link

OneCricketeer commented Apr 15, 2020

Are you sure?

com.canelmas.kafka.connect (that repo) does not match com.accelerator.kafka.connect (your stacktrace)

@Adeljoo
Copy link
Author

Adeljoo commented Apr 15, 2020

@Cricket007 I changed the package name and also changed the config based on the package. I have the following line package com.accelerator.kafka.connect; in my java class.

@Adeljoo
Copy link
Author

Adeljoo commented Apr 15, 2020

@Cricket007 that's interesting. I did the mvn test and this is the outcome:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.accelerator.test.kafka.connect.FieldAndTimeBasedPartitionerTest
Tests run: 0, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.006 sec - in com.accelerator.test.kafka.connect.FieldAndTimeBasedPartitionerTest

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ---------------------------------------------

@OneCricketeer
Copy link

No tests ran... I was asking you to write one of your own with a sample of a record that you expect to get partitioned.

@Adeljoo
Copy link
Author

Adeljoo commented Apr 16, 2020

How can I run such a test, is there any documentation avaiable?

@OneCricketeer
Copy link

OneCricketeer commented Apr 17, 2020

Tests are part of the repo...You have used Maven before, right?

https://github.com/confluentinc/kafka-connect-storage-common/tree/master/core/src/test

It's not clear what you've cloned or built if tests aren't running for you

@Adeljoo
Copy link
Author

Adeljoo commented Apr 20, 2020

I followed the instruction to building the common storage

https://github.com/confluentinc/kafka-connect-storage-common/wiki/FAQ

And, this is the new pom to perform a test.


<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.accelerator.kafka</groupId>
    <artifactId>kafka-connect-storage-partitioner</artifactId>
    <name>kafka-connect-storage-partitioner</name>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <repositories>
    <repository>
        <id>confluent</id>
        <url>http://packages.confluent.io/maven/</url>
    </repository>
    </repositories>
  <properties>
    <maven.compiler.source>1.6</maven.compiler.source>
    <maven.compiler.target>1.6</maven.compiler.target>
  </properties>
 <dependencies>
  <dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-connect-storage-partitioner</artifactId>
    <version>5.4.1</version>
    </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
     <groupId>junit</groupId>
     <artifactId>junit</artifactId>
     <version>4.13</version>
     <scope>test</scope>
      </dependency>
        <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>2.0.0-alpha1</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-connect-storage-common</artifactId>
    <version>5.4.1</version>
</dependency>
 <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.0.0-M4</version>
        <scope>test</scope>
    </dependency>
    </dependencies>
<build>
    <plugins>        
        <plugin>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.19</version>
            <dependencies>
                <dependency>
                    <groupId>org.junit.platform</groupId>
                    <artifactId>junit-platform-surefire-provider</artifactId>
                    <version>1.0.0-M4</version>
                </dependency>
                <dependency>
                    <groupId>org.junit.jupiter</groupId>
                    <artifactId>junit-jupiter-engine</artifactId>
                    <version>5.0.0-M4</version>
                </dependency>
            </dependencies>
        </plugin>
    </plugins>
</build>
</project>

@OneCricketeer
Copy link

But you don't need to do this. You can fork this repo, as is, then add your own partitioner and tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants