Anonymizer#

Phone numbers can be anonymized before or after featurization; this workbook walks through both. Additionally, phone number formatting (e.g. expectations about international calling codes having been stripped) can be checked during the anonymization process by passing a format_checker to the anonymizer; this is demonstrated as well.

from cider.datastore import DataStore
from cider.anonymizer import Anonymizer
from pandas.api.types import is_numeric_dtype

import os
import sys

import pandas as pd

# Prevents python version mismatches between spark driver and executor
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

Set up the configuration file and load some simulated data, including featurization results if present, using the datastore.

# This path should point to your cider installation, where configs and data for this demo are located. In particular, this file must
# contain a 
from pathlib import Path
cider_installation_directory = Path('../../cider')

datastore = DataStore(config_file_path_string= cider_installation_directory / 'configs' / 'config_anonymize.yml')
anonymizer = Anonymizer(datastore=datastore)

outputs_path = anonymizer.outputs_path
23/05/24 13:54:44 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.43 instead (on interface en0)
23/05/24 13:54:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/24 13:54:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                
Loading CDR...
                                                                                
Loading recharges...
SUCCESS!
Loading mobile data...
Loading mobile money...

Anonymize input data#

We can anonymize the five main categories of input data which contain phone numbers:

  • CDR (calls and texts)

  • Mobile money transactions

  • Mobile data transactions

  • Recharges

  • Labels (e.g. from a ground-truth survey)

It’s important to anonymize these using the same anonymization salt, to ensure that all of a given subscriber’s data is indexed by the same obfuscated string.

anonymizer.anonymize_cdr()

anonymized_cdr = pd.read_csv(outputs_path / 'outputs' / 'cdr.csv')
anonymized_cdr.head()
                                                                                
txn_type caller_id recipient_id timestamp duration caller_antenna recipient_antenna international
0 call p7OPQLOrvqqYDvVa 1Z7rQb4GO2jXzbpG 2020-01-01 00:00:42 253.0 a101 a54 domestic
1 text p7OPQLOrVlZYDvVa AbRlyk74NOBJyqwY 2020-01-01 00:02:04 NaN a44 a110 domestic
2 text G5vAyA41G9wqQadO 1xMmQpJkXaVVz9wa 2020-01-01 00:02:12 NaN a145 a96 domestic
3 call nVLoQg1drok6zx3Z KlVDmV2g29mA7Qvq 2020-01-01 00:02:23 96.0 a84 a36 domestic
4 text bRlykJqR9eAXJQqw BWvLye4LxnnOQdYG 2020-01-01 00:03:05 NaN a261 a268 domestic
anonymizer.anonymize_mobilemoney()

anonymized_mobilemoney = pd.read_csv(outputs_path / 'outputs' / 'mobilemoney.csv')
anonymized_mobilemoney.head()
txn_type caller_id recipient_id timestamp amount sender_balance_before sender_balance_after recipient_balance_before recipient_balance_after
0 p2p LxA7zdJNn8enQbVY 35nJzJZNWkPnz16l 2020-01-01 00:15:51 55.163315 324.21875 269.05542 112.54498 167.70830
1 cashout LxA7zdJd7rKPQbVY NaN 2020-01-01 00:18:59 81.861000 185.57620 103.71520 NaN NaN
2 cashin 0vK4yVJnrmaOy71O NaN 2020-01-01 00:28:17 29.225048 98.63829 127.86334 NaN NaN
3 p2p O8xoDrk5vX36QEGL VpAaQawNqaN6ze9q 2020-01-01 00:28:29 45.561913 248.71838 203.15646 212.01639 257.57828
4 cashout NkZlD88rZN0BDG5J NaN 2020-01-01 00:31:03 55.040770 150.88644 95.84567 NaN NaN
anonymizer.anonymize_mobiledata()

anonymized_mobiledata = pd.read_csv(outputs_path / 'outputs' / 'mobiledata.csv')
anonymized_mobiledata.head()
caller_id volume timestamp
0 mpLEyMkZWmNdDqvN 91.38652 2020-01-01 00:01:15
1 NkZlD88rvpbZDG5J 118.89835 2020-01-01 00:02:27
2 BWvLye4EgqAWQdYG 67.68214 2020-01-01 00:16:33
3 WvLyem297x9l6ydY 65.52507 2020-01-01 00:23:37
4 nVLoQg1drok6zx3Z 99.46138 2020-01-01 00:41:44
anonymizer.anonymize_recharges()

anonymized_recharges = pd.read_csv(outputs_path / 'outputs' / 'recharges.csv')
anonymized_recharges.head()
caller_id amount timestamp
0 6aA9yvnL0o61yJMY 96.0 2020-01-01 00:02:47
1 1xMmQpJ74bMkz9wa 73.0 2020-01-01 00:04:33
2 AbRlyk74R1YJyqwY 98.0 2020-01-01 00:08:36
3 1xMmQpJk8ALoz9wa 7.0 2020-01-01 00:14:37
4 1Z7rQb4Nbo1vzbpG 76.0 2020-01-01 00:24:36
anonymizer.anonymize_labels()

anonymized_labels = pd.read_csv(outputs_path / 'outputs' / 'labels.csv')
anonymized_labels.head()
name label weight
0 BKGnyZoaegLxz9Za 18760 3.711266
1 6aA9yvnL8aqOyJMY 21897 69.385541
2 Bj0nDOWV1VNbDK2O 20663 67.035003
3 eALxDXe0WKplQdlp 19756 43.047178
4 p7OPQLOYpMKXDvVa 19978 76.778898
raw_cdr_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.cdr)
raw_recharges_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.recharges)
raw_mobiledata_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.mobiledata)
raw_mobilemoney_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.mobilemoney)
raw_labels_file = pd.read_csv(datastore.cfg.path.input_data.file_paths.labels)

Anonymize features#

We can also anonymize featurized data. This is not necessary (nor will it work) if the features are computed using already-anonymized data.

anonymizer.anonymize_features()
anonymized_features = pd.read_csv(outputs_path / 'outputs' / 'features.csv')
anonymized_features.head()
                                                                                
name active_days_allweek_allday active_days_allweek_day active_days_allweek_night active_days_weekday_allday active_days_weekday_day active_days_weekday_night active_days_weekend_allday active_days_weekend_day active_days_weekend_night ... mobilemoney_outgoing_p2p_amount_min mobilemoney_outgoing_p2p_amount_max mobilemoney_outgoing_p2p_balance_before_mean mobilemoney_outgoing_p2p_balance_before_min mobilemoney_outgoing_p2p_balance_before_max mobilemoney_outgoing_p2p_balance_after_mean mobilemoney_outgoing_p2p_balance_after_min mobilemoney_outgoing_p2p_balance_after_max mobilemoney_outgoing_p2p_txns mobilemoney_outgoing_p2p_contacts
0 xMmQpj25daOLRz9w 54 46 46 38 33 32 16 13 14 ... 30.222473 70.803710 206.221286 180.20338 239.67809 158.092635 109.39968 185.19756 6.0 6.0
1 Na6Q4xNdBX5RdQxw 56 51 44 41 36 32 15 15 12 ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
2 nGeQjv2jeVrRBQXW 57 45 45 41 34 32 16 11 13 ... 32.282750 66.296750 194.259438 136.78952 285.25427 147.203631 70.49276 252.97151 6.0 6.0
3 5vAyA0NZP6oZ6Qad 54 47 46 40 35 33 14 12 13 ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
4 pLEyMlebKR37oQqv 53 41 44 39 31 33 14 10 11 ... 43.527973 63.519386 191.142494 132.54959 297.36365 138.052689 69.03021 247.31479 6.0 6.0

5 rows × 1122 columns

Use a custom format checker#

It is a good idea to check the format of numbers as you anonymize them to make sure they have been processed according to expectations. Because the anonymizer uses a hash function, similar numbers will not result in similar anonymized strings. For example, the numbers 1234567, 01234567, and 880 1234567 will all result in completely different anonymized strings.

The Anonymizer object accepts a format checker when it’s constructed. The checker will be evaluated on inputs as strings, and should return True if the format is acceptable and False if not.

Anonymization will fail if any number fails this check. So we encourage you to clean your data prior to passing it through this step.

This demo is based on a simplified version of Bangladesh’s mobile phone number format. We’re assuming that leading “0”, “880”, and “+” are removed, which means all numbers will be ten digits and will not start with “0”.

It’s important to be consistent in how phone numbers are provided. For example, if a number is represented differently based on where the call is dialed from, it’s necessary to normalize: Perhaps by stripping the country code and also the domestic prefix (here ‘0’), and by stripping the operator prefix. The anonymizer does not intelligently map two representations of the same number to the same string. Cider expects information about whether calls/texts are domestic or international to be provided. So it’s likely users will already have checked number prefixes, and perhaps removed them.

def is_valid_mobile_number(x):
    if any(c.isdigit() == False for c in x):
        print(x, " has non-numeric characters. Please remove these.")
        return False
    if x[0] == "0":
        print(x, "starts with 0. Please strip leading zeros from numbers.")
        return False
    sl = len(x)
    if sl != 10:
        print("Valid numbers have ten digits;", x, "has", sl)
        return False
    return True

anonymizer_with_check = Anonymizer(datastore=datastore, format_checker=is_valid_mobile_number)
Loading CDR...
Loading recharges...
SUCCESS!
Loading mobile data...
Loading mobile money...

The synthetic data we are working with has not been processed to remove leading “880”s, “0”s, or “+”s, so when we try to anonymize the CDR data, an error is raised notifying the user that phone numbers haven’t been processed as expected.

print(datastore.cdr.toPandas().head())
anonymizer_with_check.anonymize_cdr()
/Users/mlevy/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:248: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead
  series = series.astype(t, copy=False)
/Users/mlevy/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:248: FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead
  series = series.astype(t, copy=False)
  txn_type       caller_id    recipient_id           timestamp  duration  \
0     call     00180479117     00186879451 2020-01-01 00:00:42     253.0   
1     text     00181616517     00185633871 2020-01-01 00:02:04       NaN   
2     text     00189653293     00186624501 2020-01-01 00:02:12       NaN   
3     call     00183331706  +8800186773263 2020-01-01 00:02:23      96.0   
4     text  +8800189721427     00188688509 2020-01-01 00:03:05       NaN   

  caller_antenna recipient_antenna international        day  
0           a101               a54      domestic 2020-01-01  
1            a44              a110      domestic 2020-01-01  
2           a145               a96      domestic 2020-01-01  
3            a84               a36      domestic 2020-01-01  
4           a261              a268      domestic 2020-01-01  
Valid numbers have ten digits; 8800180789080 has 13
00180479117 starts with 0. Please strip leading zeros from numbers.
23/05/24 13:55:20 ERROR Executor: Exception in task 0.0 in stage 49.0 (TID 45)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 142, in <lambda>
    lambda raw: Anonymizer._check_identifier_format_and_hash(raw, encoder, format_checker), StringType()
  File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 197, in _check_identifier_format_and_hash
    raise ValueError(f'Bad input to anonymization: {raw_string} rejected by provided format format_checker.')
ValueError: Bad input to anonymization: 00180479117 rejected by provided format format_checker.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
23/05/24 13:55:20 ERROR Executor: Exception in task 1.0 in stage 49.0 (TID 46)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 142, in <lambda>
    lambda raw: Anonymizer._check_identifier_format_and_hash(raw, encoder, format_checker), StringType()
  File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 197, in _check_identifier_format_and_hash
    raise ValueError(f'Bad input to anonymization: {raw_string} rejected by provided format format_checker.')
ValueError: Bad input to anonymization: 8800180789080 rejected by provided format format_checker.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
23/05/24 13:55:20 ERROR TaskSetManager: Task 0 in stage 49.0 failed 1 times; aborting job
---------------------------------------------------------------------------
PythonException                           Traceback (most recent call last)
Cell In[11], line 2
      1 print(datastore.cdr.toPandas().head())
----> 2 anonymizer_with_check.anonymize_cdr()

File ~/Dropbox/professional/givedirectly/cider/cider/anonymizer.py:99, in Anonymizer.anonymize_cdr(self)
     98 def anonymize_cdr(self):
---> 99     self._anonymize_dataset('cdr', ['caller_id', 'recipient_id'])

File ~/Dropbox/professional/givedirectly/cider/cider/anonymizer.py:157, in Anonymizer._anonymize_dataset(self, dataset_name, column_names)
    154 if 'day' in dataset_with_anonymized_columns.columns:
    155     dataset_with_anonymized_columns = dataset_with_anonymized_columns.drop(col('day'))
--> 157 save_df(dataset_with_anonymized_columns, self.outputs_path / 'outputs' / f'{dataset_name}.csv')

File ~/Dropbox/professional/givedirectly/cider/helpers/utils.py:112, in save_df(df, out_file_path, sep)
    107 temp_folder = out_file_path.parent / 'temp'
    109 # Ask spark to write output there. The repartition(1) call will tell spark to write a single file.
    110 # It will name it with some meaningless partition name, but we can find it easily bc it's the only
    111 # csv in the temp directory.
--> 112 df.repartition(1).write.csv(path=str(temp_folder), mode="overwrite", header="true", sep=sep)
    113 spark_generated_file_name = [
    114     fname for fname in os.listdir(temp_folder) if os.path.splitext(fname)[1] == '.csv'
    115 ][0]
    117 # move the file out of the temporary directory and rename it

File ~/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/pyspark/sql/readwriter.py:1240, in DataFrameWriter.csv(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, lineSep)
   1221 self.mode(mode)
   1222 self._set_opts(
   1223     compression=compression,
   1224     sep=sep,
   (...)
   1238     lineSep=lineSep,
   1239 )
-> 1240 self._jwrite.csv(path)

File ~/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File ~/miniconda3/envs/myenv_x86/lib/python3.8/site-packages/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
    192 converted = convert_exception(e.java_exception)
    193 if not isinstance(converted, UnknownException):
    194     # Hide where the exception came from that shows a non-Pythonic
    195     # JVM exception message.
--> 196     raise converted from None
    197 else:
    198     raise

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 142, in <lambda>
    lambda raw: Anonymizer._check_identifier_format_and_hash(raw, encoder, format_checker), StringType()
  File "/Users/mlevy/Dropbox/professional/givedirectly/cider/cider/anonymizer.py", line 197, in _check_identifier_format_and_hash
    raise ValueError(f'Bad input to anonymization: {raw_string} rejected by provided format format_checker.')
ValueError: Bad input to anonymization: 00180479117 rejected by provided format format_checker.