Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using countdownlatch to hold the calling thread copying ObjC sdk #822

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 66 additions & 20 deletions src/main/java/com/mixpanel/android/mpmetrics/AnalyticsMessages.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import android.os.Looper;
import android.os.Message;
import android.os.Process;
import android.os.Bundle;
import android.util.DisplayMetrics;

import com.mixpanel.android.util.Base64Coder;
Expand All @@ -26,6 +27,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import javax.net.ssl.SSLSocketFactory;

Expand Down Expand Up @@ -117,13 +119,44 @@ public void clearAnonymousUpdatesMessage(final MixpanelDescription clearAnonymou

public void postToServer(final MixpanelDescription flushDescription) {
final Message m = Message.obtain();
Bundle data = new Bundle();
data.putString("token", flushDescription.getToken());
m.setData(data);
m.what = FLUSH_QUEUE;
m.obj = flushDescription.getToken();
m.arg1 = 0;
m.arg2 = 0;

mWorker.runMessage(m);
}

public Integer postToServer(final MixpanelDescription flushDescription, boolean sync) {
if (!sync) {
postToServer(flushDescription);
return -1;
}

final Message m = Message.obtain();
Bundle data = new Bundle();
data.putString("token", flushDescription.getToken());
m.setData(data);
m.what = FLUSH_QUEUE;
m.obj = new CountDownLatch(1);
m.arg1 = 0;
m.arg2 = 1;

mWorker.runMessage(m);
Integer resultCode = data.getInt("returnCode");

logAboutMessageToMixpanel("Status Code Main Thread: " + resultCode);
logAboutMessageToMixpanel("bundle accessing from post to server: " + resultCode);

try {
return resultCode.intValue();
} catch(NullPointerException e) {
return -1;
}
}

public void emptyTrackingQueues(final MixpanelDescription mixpanelDescription) {
final Message m = Message.obtain();
m.what = EMPTY_QUEUES;
Expand Down Expand Up @@ -341,6 +374,14 @@ public void runMessage(Message msg) {
logAboutMessageToMixpanel("Dead mixpanel worker dropping a message: " + msg.what);
} else {
mHandler.sendMessage(msg);
if (msg.arg2 != 0) {
try {
CountDownLatch flushLatch = (CountDownLatch) msg.obj;
flushLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
}
Expand Down Expand Up @@ -416,8 +457,12 @@ public void handleMessage(Message msg) {
} else if (msg.what == FLUSH_QUEUE) {
logAboutMessageToMixpanel("Flushing queue due to scheduled or forced flush");
updateFlushFrequency();
token = (String) msg.obj;
sendAllData(mDbAdapter, token);
token = msg.getData().getString("token");
int res = sendAllData(mDbAdapter, token);
if (msg.arg2 != 0) {
msg.getData().putInt("returnCode", res);
((CountDownLatch) msg.obj).countDown();
}
} else if (msg.what == EMPTY_QUEUES) {
final MixpanelDescription message = (MixpanelDescription) msg.obj;
token = message.getToken();
Expand Down Expand Up @@ -478,26 +523,32 @@ protected long getTrackEngageRetryAfter() {
return mTrackEngageRetryAfter;
}

private void sendAllData(MPDbAdapter dbAdapter, String token) {
private int sendAllData(MPDbAdapter dbAdapter, String token) {
final RemoteService poster = getPoster();
if (!poster.isOnline(mContext, mConfig.getOfflineMode())) {
logAboutMessageToMixpanel("Not flushing data to Mixpanel because the device is not connected to the internet.");
return;
return -1;
}

sendData(dbAdapter, token, MPDbAdapter.Table.EVENTS, mConfig.getEventsEndpoint());
sendData(dbAdapter, token, MPDbAdapter.Table.PEOPLE, mConfig.getPeopleEndpoint());
sendData(dbAdapter, token, MPDbAdapter.Table.GROUPS, mConfig.getGroupsEndpoint());
int eventsSent = sendData(dbAdapter, token, MPDbAdapter.Table.EVENTS, mConfig.getEventsEndpoint());
int peopleSent = sendData(dbAdapter, token, MPDbAdapter.Table.PEOPLE, mConfig.getPeopleEndpoint());
int groupsSent = sendData(dbAdapter, token, MPDbAdapter.Table.GROUPS, mConfig.getGroupsEndpoint());

logAboutMessageToMixpanel("Data flushed to Mixpanel successfully. Events sent: " + eventsSent + ", People sent: " + peopleSent + ", Groups sent: " + groupsSent);
return eventsSent;
}

private void sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table table, String url) {
private int sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table table, String url) {

final RemoteService poster = getPoster();
String[] eventsData = dbAdapter.generateDataString(table, token);
Integer queueCount = 0;
if (eventsData != null) {
queueCount = Integer.valueOf(eventsData[2]);
}

int statusCode = -1; // Initialize with an invalid status code

while (eventsData != null && queueCount > 0) {
final String lastId = eventsData[0];
final String rawMessage = eventsData[1];
Expand All @@ -510,28 +561,22 @@ private void sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table tab
}

boolean deleteEvents = true;
byte[] response;
try {
final SSLSocketFactory socketFactory = mConfig.getSSLSocketFactory();
response = poster.performRequest(url, params, socketFactory);
if (null == response) {
statusCode = poster.performRequest(url, params, socketFactory); // Update the status code
if (-1 == statusCode) {
deleteEvents = false;
logAboutMessageToMixpanel("Response was null, unexpected failure posting to " + url + ".");
} else {
deleteEvents = true; // Delete events on any successful post, regardless of 1 or 0 response
String parsedResponse;
try {
parsedResponse = new String(response, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("UTF not supported on this platform?", e);
}

if (mFailedRetries > 0) {
mFailedRetries = 0;
removeMessages(FLUSH_QUEUE, token);
}

logAboutMessageToMixpanel("Successfully posted to " + url + ": \n" + rawMessage);
logAboutMessageToMixpanel("Response was " + parsedResponse);
logAboutMessageToMixpanel("Response was " + statusCode);
}
} catch (final OutOfMemoryError e) {
MPLog.e(LOGTAG, "Out of memory when posting to " + url + ".", e);
Expand Down Expand Up @@ -570,8 +615,9 @@ private void sendData(MPDbAdapter dbAdapter, String token, MPDbAdapter.Table tab
queueCount = Integer.valueOf(eventsData[2]);
}
}
}

return statusCode; // Return the status code
}
private JSONObject getDefaultEventProperties()
throws JSONException {
final JSONObject ret = new JSONObject();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/mixpanel/android/mpmetrics/MPConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public synchronized void setOfflineMode(OfflineMode offlineMode) {
}

mBulkUploadLimit = metaData.getInt("com.mixpanel.android.MPConfig.BulkUploadLimit", 40); // 40 records default
mFlushInterval = metaData.getInt("com.mixpanel.android.MPConfig.FlushInterval", 60 * 1000); // one minute default
mFlushInterval = metaData.getInt("com.mixpanel.android.MPConfig.FlushInterval", Integer.MAX_VALUE); // one minute default
mFlushBatchSize = metaData.getInt("com.mixpanel.android.MPConfig.FlushBatchSize", 50); // flush 50 events at a time by default
mFlushOnBackground = metaData.getBoolean("com.mixpanel.android.MPConfig.FlushOnBackground", true);
mMinimumDatabaseLimit = metaData.getInt("com.mixpanel.android.MPConfig.MinimumDatabaseLimit", 20 * 1024 * 1024); // 20 Mb
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/mixpanel/android/mpmetrics/MixpanelAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import android.os.Bundle;

import com.mixpanel.android.util.MPLog;
import com.mixpanel.android.util.RemoteService;
import com.mixpanel.android.util.HttpService;

import org.json.JSONArray;
import org.json.JSONException;
Expand Down Expand Up @@ -841,6 +843,26 @@ public void flush() {
mMessages.postToServer(new AnalyticsMessages.MixpanelDescription(mToken));
}


/**
* Synchronized flush to make sure all events sent before continuing
* made exclusively for AppDome purposes
*
* @return HTTP status code / -1 if device is failed
*/
public int blockingFlush() {
if (hasOptedOutTracking())
return -1;

final RemoteService poster = new HttpService();
if (!poster.isOnline(mContext, mConfig.getOfflineMode())) {
MPLog.w(LOGTAG, "Not flushing data to Mixpanel because the device is not connected to the internet.");
return 503;
}

return mMessages.postToServer(new AnalyticsMessages.MixpanelDescription(mToken), true);
}

/**
* Returns a json object of the user's current super properties
*
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/com/mixpanel/android/util/HttpService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ private boolean onOfflineMode(OfflineMode offlineMode) {
}

@Override
public byte[] performRequest(String endpointUrl, Map<String, Object> params, SSLSocketFactory socketFactory) throws ServiceUnavailableException, IOException {
public int performRequest(String endpointUrl, Map<String, Object> params, SSLSocketFactory socketFactory) throws ServiceUnavailableException, IOException {
MPLog.v(LOGTAG, "Attempting request to " + endpointUrl);

byte[] response = null;
int statusCode = -1;

// the while(retries) loop is a workaround for a bug in some Android HttpURLConnection
// libraries- The underlying library will attempt to reuse stale connections,
Expand Down Expand Up @@ -133,10 +133,7 @@ public byte[] performRequest(String endpointUrl, Map<String, Object> params, SSL
out.close();
out = null;
}
in = connection.getInputStream();
response = slurp(in);
in.close();
in = null;
statusCode = connection.getResponseCode();
succeeded = true;
} catch (final EOFException e) {
MPLog.d(LOGTAG, "Failure to connect, likely caused by a known issue with Android lib. Retrying.");
Expand All @@ -162,7 +159,7 @@ public byte[] performRequest(String endpointUrl, Map<String, Object> params, SSL
if (retries >= 3) {
MPLog.v(LOGTAG, "Could not connect to Mixpanel service after three retries.");
}
return response;
return statusCode;
}

private static byte[] slurp(final InputStream inputStream)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/mixpanel/android/util/RemoteService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface RemoteService {

void checkIsMixpanelBlocked();

byte[] performRequest(String endpointUrl, Map<String, Object> params, SSLSocketFactory socketFactory)
int performRequest(String endpointUrl, Map<String, Object> params, SSLSocketFactory socketFactory)
throws ServiceUnavailableException, IOException;

class ServiceUnavailableException extends Exception {
Expand Down